Skip to content

Commit

Permalink
Add metrics to Kafka Source (#3118)
Browse files Browse the repository at this point in the history
* Add metrics to Kafka Source

Signed-off-by: Krishna Kondaka <[email protected]>

* Removed debug print statement

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed failing test case

Signed-off-by: Krishna Kondaka <[email protected]>

* Added total committed metric and fixed tests

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed number of committed records stat. Also fixed bug when acknowledgements enabled

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments. Fixed acknowledgements related bug

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed to use counters for records/bytes consumed metrics

Signed-off-by: Krishna Kondaka <[email protected]>

* Removed unused code

Signed-off-by: Krishna Kondaka <[email protected]>

* Added a metric for keeping track of number of consumers without any partitions assigned

Signed-off-by: Krishna Kondaka <[email protected]>

* Added unit test for KafkaTopicMetrics

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka authored Aug 10, 2023
1 parent 37df5bc commit 44e2eaf
Show file tree
Hide file tree
Showing 10 changed files with 557 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public void setup() {
when(jsonTopic.getName()).thenReturn(testTopic);
when(jsonTopic.getGroupId()).thenReturn(testGroup);
when(jsonTopic.getWorkers()).thenReturn(1);
when(jsonTopic.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5));
when(jsonTopic.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15));
when(jsonTopic.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3));
when(jsonTopic.getAutoCommit()).thenReturn(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public void setup() {
when(plainTextTopic.getName()).thenReturn(testTopic);
when(plainTextTopic.getGroupId()).thenReturn(testGroup);
when(plainTextTopic.getWorkers()).thenReturn(1);
when(plainTextTopic.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5));
when(plainTextTopic.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15));
when(plainTextTopic.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3));
when(plainTextTopic.getAutoCommit()).thenReturn(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.instrument.Counter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand All @@ -19,11 +18,11 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.avro.generic.GenericRecord;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
Expand All @@ -34,17 +33,20 @@
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Map;
import java.util.HashMap;
import java.time.Instant;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics;
import com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema;
import org.apache.commons.lang3.Range;

Expand All @@ -55,16 +57,13 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis
private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceCustomConsumer.class);
private static final Long COMMIT_OFFSET_INTERVAL_MS = 300000L;
private static final int DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE = 1;
static final String POSITIVE_ACKNOWLEDGEMENT_METRIC_NAME = "positiveAcknowledgementSetCounter";
static final String NEGATIVE_ACKNOWLEDGEMENT_METRIC_NAME = "negativeAcknowledgementSetCounter";
static final String DEFAULT_KEY = "message";

private volatile long lastCommitTime;
private KafkaConsumer consumer= null;
private AtomicBoolean shutdownInProgress;
private final String topicName;
private final TopicConfig topicConfig;
private PluginMetrics pluginMetrics= null;
private MessageFormat schema;
private final BufferAccumulator<Record<Event>> bufferAccumulator;
private final Buffer<Record<Event>> buffer;
Expand All @@ -74,10 +73,11 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis
private Set<TopicPartition> partitionsToReset;
private final AcknowledgementSetManager acknowledgementSetManager;
private final Map<Integer, TopicPartitionCommitTracker> partitionCommitTrackerMap;
private final Counter positiveAcknowledgementSetCounter;
private final Counter negativeAcknowledgementSetCounter;
private List<Map<TopicPartition, Range<Long>>> acknowledgedOffsets;
private final boolean acknowledgementsEnabled;
private final Duration acknowledgementsTimeout;
private final KafkaTopicMetrics topicMetrics;
private long metricsUpdatedTime;

public KafkaSourceCustomConsumer(final KafkaConsumer consumer,
final AtomicBoolean shutdownInProgress,
Expand All @@ -86,29 +86,33 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer,
final TopicConfig topicConfig,
final String schemaType,
final AcknowledgementSetManager acknowledgementSetManager,
final PluginMetrics pluginMetrics) {
KafkaTopicMetrics topicMetrics) {
this.topicName = topicConfig.getName();
this.topicConfig = topicConfig;
this.shutdownInProgress = shutdownInProgress;
this.consumer = consumer;
this.buffer = buffer;
this.topicMetrics = topicMetrics;
this.topicMetrics.register(consumer);
this.offsetsToCommit = new HashMap<>();
this.metricsUpdatedTime = Instant.now().getEpochSecond();
this.acknowledgedOffsets = new ArrayList<>();
this.acknowledgementsTimeout = sourceConfig.getAcknowledgementsTimeout();
// If the timeout value is different from default value, then enable acknowledgements automatically.
this.acknowledgementsEnabled = sourceConfig.getAcknowledgementsEnabled() || acknowledgementsTimeout != KafkaSourceConfig.DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT;
this.acknowledgementSetManager = acknowledgementSetManager;
this.pluginMetrics = pluginMetrics;
this.partitionCommitTrackerMap = new HashMap<>();
this.partitionsToReset = new HashSet<>();
this.partitionsToReset = Collections.synchronizedSet(new HashSet<>());
this.schema = MessageFormat.getByMessageFormatByName(schemaType);
Duration bufferTimeout = Duration.ofSeconds(1);
this.bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE, bufferTimeout);
this.lastCommitTime = System.currentTimeMillis();
this.positiveAcknowledgementSetCounter = pluginMetrics.counter(POSITIVE_ACKNOWLEDGEMENT_METRIC_NAME);
this.negativeAcknowledgementSetCounter = pluginMetrics.counter(NEGATIVE_ACKNOWLEDGEMENT_METRIC_NAME);
}

public void updateOffsetsToCommit(final TopicPartition partition, final OffsetAndMetadata offsetAndMetadata) {
public void updateOffsetsToCommit(final TopicPartition partition, final OffsetAndMetadata offsetAndMetadata, Range<Long> offsetRange) {
long min = offsetRange.getMinimum();
long max = offsetRange.getMaximum();
topicMetrics.getNumberOfRecordsCommitted().increment(max - min + 1);
if (Objects.isNull(offsetAndMetadata)) {
return;
}
Expand All @@ -121,37 +125,20 @@ private AcknowledgementSet createAcknowledgementSet(Map<TopicPartition, Range<Lo
AcknowledgementSet acknowledgementSet =
acknowledgementSetManager.create((result) -> {
if (result == true) {
positiveAcknowledgementSetCounter.increment();
offsets.forEach((partition, offsetRange) -> {
try {
int partitionId = partition.partition();
if (!partitionCommitTrackerMap.containsKey(partitionId)) {
OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(partition);
Long committedOffset = Objects.nonNull(committedOffsetAndMetadata) ? committedOffsetAndMetadata.offset() : null;
partitionCommitTrackerMap.put(partitionId, new TopicPartitionCommitTracker(partition, committedOffset));
}
OffsetAndMetadata offsetAndMetadata = partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange);
updateOffsetsToCommit(partition, offsetAndMetadata);
} catch (Exception e) {
LOG.error("Failed to seek to last committed offset upon positive acknowledgement {}", partition, e);
}
});
topicMetrics.getNumberOfPositiveAcknowledgements().increment();
synchronized(acknowledgedOffsets) {
acknowledgedOffsets.add(offsets);
}
} else {
negativeAcknowledgementSetCounter.increment();
topicMetrics.getNumberOfNegativeAcknowledgements().increment();
offsets.forEach((partition, offsetRange) -> {
synchronized(partitionsToReset) {
partitionsToReset.add(partition);
}
partitionsToReset.add(partition);
});
}
}, acknowledgementsTimeout);
return acknowledgementSet;
}

double getPositiveAcknowledgementsCount() {
return positiveAcknowledgementSetCounter.count();
}

public <T> void consumeRecords() throws Exception {
try {
ConsumerRecords<String, T> records = consumer.poll(topicConfig.getThreadWaitingTime().toMillis()/2);
Expand All @@ -164,23 +151,25 @@ public <T> void consumeRecords() throws Exception {
iterateRecordPartitions(records, acknowledgementSet, offsets);
if (!acknowledgementsEnabled) {
offsets.forEach((partition, offsetRange) ->
updateOffsetsToCommit(partition, new OffsetAndMetadata(offsetRange.getMaximum() + 1)));
updateOffsetsToCommit(partition, new OffsetAndMetadata(offsetRange.getMaximum() + 1), offsetRange));
} else {
acknowledgementSet.complete();
}
}
} catch (AuthenticationException e) {
LOG.warn("Access Denied while doing poll(). Will retry after 10 seconds", e);
LOG.warn("Authentication error while doing poll(). Will retry after 10 seconds", e);
topicMetrics.getNumberOfPollAuthErrors().increment();
Thread.sleep(10000);
} catch (RecordDeserializationException e) {
LOG.warn("Serialization error - topic {} partition {} offset {}, seeking past the error record",
LOG.warn("Deserialization error - topic {} partition {} offset {}, seeking past the error record",
e.topicPartition().topic(), e.topicPartition().partition(), e.offset());
topicMetrics.getNumberOfDeserializationErrors().increment();
consumer.seek(e.topicPartition(), e.offset()+1);
}
}

private void resetOrCommitOffsets() {
synchronized(partitionsToReset) {
private void resetOffsets() {
if (partitionsToReset.size() > 0) {
partitionsToReset.forEach(partition -> {
try {
final OffsetAndMetadata offsetAndMetadata = consumer.committed(partition);
Expand All @@ -191,9 +180,35 @@ private void resetOrCommitOffsets() {
});
partitionsToReset.clear();
}
}

void processAcknowledgedOffsets() {
synchronized(acknowledgedOffsets) {
acknowledgedOffsets.forEach(offsets -> {
offsets.forEach((partition, offsetRange) -> {
try {
int partitionId = partition.partition();
if (!partitionCommitTrackerMap.containsKey(partitionId)) {
OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(partition);
Long committedOffset = Objects.nonNull(committedOffsetAndMetadata) ? committedOffsetAndMetadata.offset() : null;
partitionCommitTrackerMap.put(partitionId, new TopicPartitionCommitTracker(partition, committedOffset));
}
OffsetAndMetadata offsetAndMetadata = partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange);
updateOffsetsToCommit(partition, offsetAndMetadata, offsetRange);
} catch (Exception e) {
LOG.error("Failed committed offsets upon positive acknowledgement {}", partition, e);
}
});
});
acknowledgedOffsets.clear();
}
}

private void commitOffsets() {
if (topicConfig.getAutoCommit()) {
return;
}
processAcknowledgedOffsets();
long currentTimeMillis = System.currentTimeMillis();
if ((currentTimeMillis - lastCommitTime) < topicConfig.getCommitInterval().toMillis()) {
return;
Expand Down Expand Up @@ -221,10 +236,12 @@ public void run() {
consumer.subscribe(Arrays.asList(topicName));
while (!shutdownInProgress.get()) {
try {
resetOrCommitOffsets();
resetOffsets();
commitOffsets();
consumeRecords();
topicMetrics.update(consumer);
} catch (Exception exp) {
LOG.error("Error while reading the records from the topic...", exp);
LOG.error("Error while reading the records from the topic {}", topicName, exp);
}
}
}
Expand All @@ -251,6 +268,7 @@ private <T> Record<Event> getRecord(ConsumerRecord<String, T> consumerRecord, in
}
} catch (Exception e){
LOG.error("Failed to parse JSON or AVRO record", e);
topicMetrics.getNumberOfRecordsFailedToParse().increment();
}
if (!plainTextMode) {
if (!(value instanceof Map)) {
Expand Down Expand Up @@ -293,7 +311,15 @@ private <T> void iterateRecordPartitions(ConsumerRecords<String, T> records, fin
if (acknowledgementSet != null) {
acknowledgementSet.add(record.getData());
}
bufferAccumulator.add(record);
while (true) {
try {
bufferAccumulator.add(record);
break;
} catch (SizeOverflowException e) {
topicMetrics.getNumberOfBufferSizeOverflows().increment();
Thread.sleep(100);
}
}
}
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class TopicPartitionCommitTracker {

public TopicPartitionCommitTracker(final TopicPartition topicPartition, Long committedOffset) {
this.topicPartition = topicPartition;
this.committedOffset = Objects.nonNull(committedOffset) ? committedOffset : -1L;
this.committedOffset = Objects.nonNull(committedOffset) ? committedOffset-1 : -1L;
this.offsetMaxMap = new HashMap<>();
this.offsetMinMap = new HashMap<>();
this.offsetMaxMap.put(this.committedOffset, Range.between(this.committedOffset, this.committedOffset));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.dataprepper.plugins.kafka.util.KafkaSourceJsonDeserializer;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaSourceSecurityConfigurer;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics;

import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
Expand Down Expand Up @@ -118,12 +119,13 @@ public void start(Buffer<Record<Event>> buffer) {
KafkaSourceSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG);
sourceConfig.getTopics().forEach(topic -> {
consumerGroupID = topic.getGroupId();
KafkaTopicMetrics topicMetrics = new KafkaTopicMetrics(topic.getName(), pluginMetrics);
Properties consumerProperties = getConsumerProperties(topic, authProperties);
MessageFormat schema = MessageFormat.getByMessageFormatByName(schemaType);
try {
int numWorkers = topic.getWorkers();
executorService = Executors.newFixedThreadPool(numWorkers);
IntStream.range(0, numWorkers + 1).forEach(index -> {
IntStream.range(0, numWorkers).forEach(index -> {
switch (schema) {
case JSON:
kafkaConsumer = new KafkaConsumer<String, JsonNode>(consumerProperties);
Expand All @@ -136,7 +138,7 @@ public void start(Buffer<Record<Event>> buffer) {
kafkaConsumer = new KafkaConsumer<String, String>(consumerProperties);
break;
}
consumer = new KafkaSourceCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType, acknowledgementSetManager, pluginMetrics);
consumer = new KafkaSourceCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType, acknowledgementSetManager, topicMetrics);
executorService.submit(consumer);
});
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public static String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuth
try {
result = kafkaClient.getBootstrapBrokers(request);
} catch (KafkaException | StsException e) {
LOG.debug("Failed to get bootstrap server information from MSK. Retrying...", e);
LOG.info("Failed to get bootstrap server information from MSK. Will try every 10 seconds for {} seconds", 10*MAX_KAFKA_CLIENT_RETRIES, e);
try {
Thread.sleep(10000);
} catch (InterruptedException exp) {}
Expand Down
Loading

0 comments on commit 44e2eaf

Please sign in to comment.