Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics to Kafka Source #3118

Merged
merged 11 commits into from
Aug 10, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public void setup() {
when(jsonTopic.getName()).thenReturn(testTopic);
when(jsonTopic.getGroupId()).thenReturn(testGroup);
when(jsonTopic.getWorkers()).thenReturn(1);
when(jsonTopic.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5));
when(jsonTopic.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15));
when(jsonTopic.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3));
when(jsonTopic.getAutoCommit()).thenReturn(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public void setup() {
when(plainTextTopic.getName()).thenReturn(testTopic);
when(plainTextTopic.getGroupId()).thenReturn(testGroup);
when(plainTextTopic.getWorkers()).thenReturn(1);
when(plainTextTopic.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5));
when(plainTextTopic.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15));
when(plainTextTopic.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3));
when(plainTextTopic.getAutoCommit()).thenReturn(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.instrument.Counter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand All @@ -19,11 +18,11 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.avro.generic.GenericRecord;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
Expand All @@ -34,17 +33,20 @@
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Map;
import java.util.HashMap;
import java.time.Instant;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics;
import com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema;
import org.apache.commons.lang3.Range;

Expand All @@ -55,16 +57,13 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis
private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceCustomConsumer.class);
private static final Long COMMIT_OFFSET_INTERVAL_MS = 300000L;
private static final int DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE = 1;
static final String POSITIVE_ACKNOWLEDGEMENT_METRIC_NAME = "positiveAcknowledgementSetCounter";
static final String NEGATIVE_ACKNOWLEDGEMENT_METRIC_NAME = "negativeAcknowledgementSetCounter";
static final String DEFAULT_KEY = "message";

private volatile long lastCommitTime;
private KafkaConsumer consumer= null;
private AtomicBoolean shutdownInProgress;
private final String topicName;
private final TopicConfig topicConfig;
private PluginMetrics pluginMetrics= null;
private MessageFormat schema;
private final BufferAccumulator<Record<Event>> bufferAccumulator;
private final Buffer<Record<Event>> buffer;
Expand All @@ -74,10 +73,11 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis
private Set<TopicPartition> partitionsToReset;
private final AcknowledgementSetManager acknowledgementSetManager;
private final Map<Integer, TopicPartitionCommitTracker> partitionCommitTrackerMap;
private final Counter positiveAcknowledgementSetCounter;
private final Counter negativeAcknowledgementSetCounter;
private List<Map<TopicPartition, Range<Long>>> acknowledgedOffsets;
private final boolean acknowledgementsEnabled;
private final Duration acknowledgementsTimeout;
private final KafkaTopicMetrics topicMetrics;
private long metricsUpdatedTime;

public KafkaSourceCustomConsumer(final KafkaConsumer consumer,
final AtomicBoolean shutdownInProgress,
Expand All @@ -86,29 +86,33 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer,
final TopicConfig topicConfig,
final String schemaType,
final AcknowledgementSetManager acknowledgementSetManager,
final PluginMetrics pluginMetrics) {
KafkaTopicMetrics topicMetrics) {
this.topicName = topicConfig.getName();
this.topicConfig = topicConfig;
this.shutdownInProgress = shutdownInProgress;
this.consumer = consumer;
this.buffer = buffer;
this.topicMetrics = topicMetrics;
this.topicMetrics.register(consumer);
this.offsetsToCommit = new HashMap<>();
this.metricsUpdatedTime = Instant.now().getEpochSecond();
this.acknowledgedOffsets = new ArrayList<>();
this.acknowledgementsTimeout = sourceConfig.getAcknowledgementsTimeout();
// If the timeout value is different from default value, then enable acknowledgements automatically.
this.acknowledgementsEnabled = sourceConfig.getAcknowledgementsEnabled() || acknowledgementsTimeout != KafkaSourceConfig.DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT;
this.acknowledgementSetManager = acknowledgementSetManager;
this.pluginMetrics = pluginMetrics;
this.partitionCommitTrackerMap = new HashMap<>();
this.partitionsToReset = new HashSet<>();
this.partitionsToReset = Collections.synchronizedSet(new HashSet<>());
this.schema = MessageFormat.getByMessageFormatByName(schemaType);
Duration bufferTimeout = Duration.ofSeconds(1);
this.bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE, bufferTimeout);
this.lastCommitTime = System.currentTimeMillis();
this.positiveAcknowledgementSetCounter = pluginMetrics.counter(POSITIVE_ACKNOWLEDGEMENT_METRIC_NAME);
this.negativeAcknowledgementSetCounter = pluginMetrics.counter(NEGATIVE_ACKNOWLEDGEMENT_METRIC_NAME);
}

public void updateOffsetsToCommit(final TopicPartition partition, final OffsetAndMetadata offsetAndMetadata) {
public void updateOffsetsToCommit(final TopicPartition partition, final OffsetAndMetadata offsetAndMetadata, Range<Long> offsetRange) {
long min = offsetRange.getMinimum();
long max = offsetRange.getMaximum();
topicMetrics.getNumberOfRecordsCommitted().increment(max - min + 1);
if (Objects.isNull(offsetAndMetadata)) {
return;
}
Expand All @@ -121,37 +125,20 @@ private AcknowledgementSet createAcknowledgementSet(Map<TopicPartition, Range<Lo
AcknowledgementSet acknowledgementSet =
acknowledgementSetManager.create((result) -> {
if (result == true) {
positiveAcknowledgementSetCounter.increment();
offsets.forEach((partition, offsetRange) -> {
try {
int partitionId = partition.partition();
if (!partitionCommitTrackerMap.containsKey(partitionId)) {
OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(partition);
Long committedOffset = Objects.nonNull(committedOffsetAndMetadata) ? committedOffsetAndMetadata.offset() : null;
partitionCommitTrackerMap.put(partitionId, new TopicPartitionCommitTracker(partition, committedOffset));
}
OffsetAndMetadata offsetAndMetadata = partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange);
updateOffsetsToCommit(partition, offsetAndMetadata);
} catch (Exception e) {
LOG.error("Failed to seek to last committed offset upon positive acknowledgement {}", partition, e);
}
});
topicMetrics.getNumberOfPositiveAcknowledgements().increment();
synchronized(acknowledgedOffsets) {
acknowledgedOffsets.add(offsets);
}
} else {
negativeAcknowledgementSetCounter.increment();
topicMetrics.getNumberOfNegativeAcknowledgements().increment();
offsets.forEach((partition, offsetRange) -> {
synchronized(partitionsToReset) {
partitionsToReset.add(partition);
}
partitionsToReset.add(partition);
});
}
}, acknowledgementsTimeout);
return acknowledgementSet;
}

double getPositiveAcknowledgementsCount() {
return positiveAcknowledgementSetCounter.count();
}

public <T> void consumeRecords() throws Exception {
try {
ConsumerRecords<String, T> records = consumer.poll(topicConfig.getThreadWaitingTime().toMillis()/2);
Expand All @@ -164,23 +151,25 @@ public <T> void consumeRecords() throws Exception {
iterateRecordPartitions(records, acknowledgementSet, offsets);
if (!acknowledgementsEnabled) {
offsets.forEach((partition, offsetRange) ->
updateOffsetsToCommit(partition, new OffsetAndMetadata(offsetRange.getMaximum() + 1)));
updateOffsetsToCommit(partition, new OffsetAndMetadata(offsetRange.getMaximum() + 1), offsetRange));
} else {
acknowledgementSet.complete();
}
}
} catch (AuthenticationException e) {
LOG.warn("Access Denied while doing poll(). Will retry after 10 seconds", e);
LOG.warn("Authentication error while doing poll(). Will retry after 10 seconds", e);
topicMetrics.getNumberOfPollAuthErrors().increment();
Thread.sleep(10000);
} catch (RecordDeserializationException e) {
LOG.warn("Serialization error - topic {} partition {} offset {}, seeking past the error record",
LOG.warn("Deserialization error - topic {} partition {} offset {}, seeking past the error record",
e.topicPartition().topic(), e.topicPartition().partition(), e.offset());
topicMetrics.getNumberOfDeserializationErrors().increment();
consumer.seek(e.topicPartition(), e.offset()+1);
}
}

private void resetOrCommitOffsets() {
synchronized(partitionsToReset) {
private void resetOffsets() {
if (partitionsToReset.size() > 0) {
partitionsToReset.forEach(partition -> {
try {
final OffsetAndMetadata offsetAndMetadata = consumer.committed(partition);
Expand All @@ -191,9 +180,35 @@ private void resetOrCommitOffsets() {
});
partitionsToReset.clear();
}
}

void processAcknowledgedOffsets() {
synchronized(acknowledgedOffsets) {
acknowledgedOffsets.forEach(offsets -> {
offsets.forEach((partition, offsetRange) -> {
try {
int partitionId = partition.partition();
if (!partitionCommitTrackerMap.containsKey(partitionId)) {
OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(partition);
Long committedOffset = Objects.nonNull(committedOffsetAndMetadata) ? committedOffsetAndMetadata.offset() : null;
partitionCommitTrackerMap.put(partitionId, new TopicPartitionCommitTracker(partition, committedOffset));
}
OffsetAndMetadata offsetAndMetadata = partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange);
updateOffsetsToCommit(partition, offsetAndMetadata, offsetRange);
} catch (Exception e) {
LOG.error("Failed committed offsets upon positive acknowledgement {}", partition, e);
}
});
});
acknowledgedOffsets.clear();
}
}

private void commitOffsets() {
if (topicConfig.getAutoCommit()) {
return;
}
processAcknowledgedOffsets();
long currentTimeMillis = System.currentTimeMillis();
if ((currentTimeMillis - lastCommitTime) < topicConfig.getCommitInterval().toMillis()) {
return;
Expand Down Expand Up @@ -221,10 +236,12 @@ public void run() {
consumer.subscribe(Arrays.asList(topicName));
while (!shutdownInProgress.get()) {
try {
resetOrCommitOffsets();
resetOffsets();
commitOffsets();
consumeRecords();
topicMetrics.update(consumer);
} catch (Exception exp) {
LOG.error("Error while reading the records from the topic...", exp);
LOG.error("Error while reading the records from the topic {}", topicName, exp);
}
}
}
Expand All @@ -251,6 +268,7 @@ private <T> Record<Event> getRecord(ConsumerRecord<String, T> consumerRecord, in
}
} catch (Exception e){
LOG.error("Failed to parse JSON or AVRO record", e);
topicMetrics.getNumberOfRecordsFailedToParse().increment();
}
if (!plainTextMode) {
if (!(value instanceof Map)) {
Expand Down Expand Up @@ -293,7 +311,15 @@ private <T> void iterateRecordPartitions(ConsumerRecords<String, T> records, fin
if (acknowledgementSet != null) {
acknowledgementSet.add(record.getData());
}
bufferAccumulator.add(record);
while (true) {
try {
bufferAccumulator.add(record);
break;
} catch (SizeOverflowException e) {
topicMetrics.getNumberOfBufferSizeOverflows().increment();
Thread.sleep(100);
}
}
}
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class TopicPartitionCommitTracker {

public TopicPartitionCommitTracker(final TopicPartition topicPartition, Long committedOffset) {
this.topicPartition = topicPartition;
this.committedOffset = Objects.nonNull(committedOffset) ? committedOffset : -1L;
this.committedOffset = Objects.nonNull(committedOffset) ? committedOffset-1 : -1L;
this.offsetMaxMap = new HashMap<>();
this.offsetMinMap = new HashMap<>();
this.offsetMaxMap.put(this.committedOffset, Range.between(this.committedOffset, this.committedOffset));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.dataprepper.plugins.kafka.util.KafkaSourceJsonDeserializer;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaSourceSecurityConfigurer;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics;

import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
Expand Down Expand Up @@ -118,12 +119,13 @@ public void start(Buffer<Record<Event>> buffer) {
KafkaSourceSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG);
sourceConfig.getTopics().forEach(topic -> {
consumerGroupID = topic.getGroupId();
KafkaTopicMetrics topicMetrics = new KafkaTopicMetrics(topic.getName(), pluginMetrics);
Properties consumerProperties = getConsumerProperties(topic, authProperties);
MessageFormat schema = MessageFormat.getByMessageFormatByName(schemaType);
try {
int numWorkers = topic.getWorkers();
executorService = Executors.newFixedThreadPool(numWorkers);
IntStream.range(0, numWorkers + 1).forEach(index -> {
IntStream.range(0, numWorkers).forEach(index -> {
switch (schema) {
case JSON:
kafkaConsumer = new KafkaConsumer<String, JsonNode>(consumerProperties);
Expand All @@ -136,7 +138,7 @@ public void start(Buffer<Record<Event>> buffer) {
kafkaConsumer = new KafkaConsumer<String, String>(consumerProperties);
break;
}
consumer = new KafkaSourceCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType, acknowledgementSetManager, pluginMetrics);
consumer = new KafkaSourceCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType, acknowledgementSetManager, topicMetrics);
executorService.submit(consumer);
});
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public static String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuth
try {
result = kafkaClient.getBootstrapBrokers(request);
} catch (KafkaException | StsException e) {
LOG.debug("Failed to get bootstrap server information from MSK. Retrying...", e);
LOG.info("Failed to get bootstrap server information from MSK. Will try every 10 seconds for {} seconds", 10*MAX_KAFKA_CLIENT_RETRIES, e);
try {
Thread.sleep(10000);
} catch (InterruptedException exp) {}
Expand Down
Loading
Loading