From b7ec94d1cc8e5b97924698c002bf1bfbf4817483 Mon Sep 17 00:00:00 2001 From: Hardeep Singh Date: Tue, 22 Aug 2023 15:12:26 -0700 Subject: [PATCH] Kafka source fixes: commit offsets, consumer group mutations, consumer shutdown (#3207) Removed acknowledgments_timeout config from kafka source Signed-off-by: Hardeep Singh (cherry picked from commit b5443634a4704cec4c33ec386747e12270aed073) --- .../kafka/source/KafkaSourceJsonTypeIT.java | 47 ++--- .../source/KafkaSourceMultipleAuthTypeIT.java | 43 ++-- .../source/MskGlueRegistryMultiTypeIT.java | 68 +++---- .../configuration/KafkaSourceConfig.java | 10 - .../kafka/consumer/CommitOffsetRange.java | 21 ++ .../consumer/KafkaSourceCustomConsumer.java | 191 ++++++++++++------ .../plugins/kafka/source/KafkaSource.java | 57 +++--- .../configuration/KafkaSourceConfigTest.java | 9 +- .../KafkaSourceCustomConsumerTest.java | 65 +++--- 9 files changed, 297 insertions(+), 214 deletions(-) create mode 100644 data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/CommitOffsetRange.java diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java index 5239eec755..afdfd9f8b0 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java @@ -5,52 +5,50 @@ package org.opensearch.dataprepper.plugins.kafka.source; -import org.apache.kafka.clients.producer.KafkaProducer; +import io.micrometer.core.instrument.Counter; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.admin.AdminClientConfig; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.mockito.Mock; +import org.opensearch.dataprepper.acknowledgements.DefaultAcknowledgementSetManager; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventMetadata; -import org.opensearch.dataprepper.acknowledgements.DefaultAcknowledgementSetManager; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaKeyMode; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaKeyMode; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; -import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; -import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; -import static org.mockito.Mockito.when; -import org.mockito.Mock; -import static org.mockito.Mockito.mock; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doAnswer; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.CoreMatchers.equalTo; -import org.apache.commons.lang3.RandomStringUtils; - -import io.micrometer.core.instrument.Counter; -import java.util.List; -import java.util.Map; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.Properties; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; -import java.time.Duration; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class KafkaSourceJsonTypeIT { private static final int TEST_ID = 123456; @@ -104,7 +102,6 @@ public void setup() { acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor); pipelineDescription = mock(PipelineDescription.class); when(sourceConfig.getAcknowledgementsEnabled()).thenReturn(false); - when(sourceConfig.getAcknowledgementsTimeout()).thenReturn(KafkaSourceConfig.DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT); when(sourceConfig.getSchemaConfig()).thenReturn(null); when(pluginMetrics.counter(anyString())).thenReturn(counter); when(pipelineDescription.getPipelineName()).thenReturn("testPipeline"); diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java index 6179ba4f57..aca9c8dd5c 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java @@ -5,48 +5,46 @@ package org.opensearch.dataprepper.plugins.kafka.source; -import org.apache.kafka.clients.producer.KafkaProducer; +import io.micrometer.core.instrument.Counter; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.admin.AdminClientConfig; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.mockito.Mock; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; -import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; -import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; -import static org.mockito.Mockito.when; -import org.mockito.Mock; -import static org.mockito.Mockito.mock; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doAnswer; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.CoreMatchers.equalTo; -import org.apache.commons.lang3.RandomStringUtils; - -import io.micrometer.core.instrument.Counter; -import java.util.List; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Properties; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; -import java.time.Duration; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class KafkaSourceMultipleAuthTypeIT { @Mock @@ -112,7 +110,6 @@ public void setup() { acknowledgementSetManager = mock(AcknowledgementSetManager.class); pipelineDescription = mock(PipelineDescription.class); when(sourceConfig.getAcknowledgementsEnabled()).thenReturn(false); - when(sourceConfig.getAcknowledgementsTimeout()).thenReturn(KafkaSourceConfig.DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT); when(sourceConfig.getSchemaConfig()).thenReturn(null); when(pluginMetrics.counter(anyString())).thenReturn(counter); when(pipelineDescription.getPipelineName()).thenReturn("testPipeline"); diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/MskGlueRegistryMultiTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/MskGlueRegistryMultiTypeIT.java index 9eb222c496..6bd4202cf6 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/MskGlueRegistryMultiTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/MskGlueRegistryMultiTypeIT.java @@ -5,63 +5,60 @@ package org.opensearch.dataprepper.plugins.kafka.source; -import org.apache.kafka.clients.producer.KafkaProducer; +import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistryKafkaSerializer; +import com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema; +import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; +import io.micrometer.core.instrument.Counter; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.StringSerializer; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.mockito.Mock; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.AwsIamAuthConfig; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.AwsConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.AwsIamAuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.MskBrokerConnectionType; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaRegistryType; -import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; -import org.opensearch.dataprepper.plugins.kafka.configuration.MskBrokerConnectionType; -import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; -import org.opensearch.dataprepper.model.configuration.PipelineDescription; -import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistryKafkaSerializer; -import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; -import com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema; -import org.apache.avro.Schema; - -import static org.mockito.Mockito.when; -import org.mockito.Mock; -import static org.mockito.Mockito.mock; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doAnswer; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.CoreMatchers.equalTo; -import org.apache.commons.lang3.RandomStringUtils; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; -import io.micrometer.core.instrument.Counter; -import java.util.List; -import java.util.Map; +import java.io.File; +import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.Properties; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.TimeUnit; -import java.io.File; -import java.io.IOException; - -import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.GenericData; -import org.apache.kafka.common.errors.SerializationException; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class MskGlueRegistryMultiTypeIT { private static final String TEST_USER = "user"; @@ -145,7 +142,6 @@ public void setup() { acknowledgementSetManager = mock(AcknowledgementSetManager.class); pipelineDescription = mock(PipelineDescription.class); when(sourceConfig.getAcknowledgementsEnabled()).thenReturn(false); - when(sourceConfig.getAcknowledgementsTimeout()).thenReturn(KafkaSourceConfig.DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT); when(sourceConfig.getSchemaConfig()).thenReturn(schemaConfig); when(schemaConfig.getType()).thenReturn(SchemaRegistryType.AWS_GLUE); when(pluginMetrics.counter(anyString())).thenReturn(counter); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java index 922c8f0d08..38f11aefe2 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java @@ -12,7 +12,6 @@ import java.util.List; import java.util.Objects; -import java.time.Duration; /** * * A helper class that helps to read user configuration values from @@ -36,8 +35,6 @@ public boolean getInsecure() { } } - public static final Duration DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT = Duration.ofSeconds(30); - @JsonProperty("bootstrap_servers") private List bootStrapServers; @@ -64,9 +61,6 @@ public boolean getInsecure() { @JsonProperty("acknowledgments") private Boolean acknowledgementsEnabled = false; - @JsonProperty("acknowledgments_timeout") - private Duration acknowledgementsTimeout = DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT; - @JsonProperty("client_dns_lookup") private String clientDnsLookup; @@ -78,10 +72,6 @@ public Boolean getAcknowledgementsEnabled() { return acknowledgementsEnabled; } - public Duration getAcknowledgementsTimeout() { - return acknowledgementsTimeout; - } - public List getTopics() { return topics; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/CommitOffsetRange.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/CommitOffsetRange.java new file mode 100644 index 0000000000..fc6e206119 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/CommitOffsetRange.java @@ -0,0 +1,21 @@ +package org.opensearch.dataprepper.plugins.kafka.consumer; + +import org.apache.commons.lang3.Range; + +public class CommitOffsetRange { + private final long epoch; + private final Range offsets; + + public CommitOffsetRange(final Range offsets, final long epoch) { + this.offsets = offsets; + this.epoch = epoch; + } + + public long getEpoch() { + return epoch; + } + + public Range getOffsets() { + return offsets; + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java index 96767016c5..6782640add 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java @@ -4,38 +4,42 @@ */ package org.opensearch.dataprepper.plugins.kafka.consumer; +import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException; +import com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.Range; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.consumer.CommitFailedException; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.RecordDeserializationException; -import org.apache.kafka.common.TopicPartition; -import org.apache.avro.generic.GenericRecord; -import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.buffer.SizeOverflowException; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventMetadata; -import org.opensearch.dataprepper.model.buffer.SizeOverflowException; -import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; -import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; -import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; +import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaKeyMode; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; -import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; +import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics; +import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.time.Instant; -import java.util.Arrays; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -45,16 +49,13 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; -import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics; -import com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema; -import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException; -import org.apache.commons.lang3.Range; +import java.util.concurrent.atomic.AtomicInteger; /** * * A utility class which will handle the core Kafka consumer operation. */ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceListener { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceCustomConsumer.class); private static final Long COMMIT_OFFSET_INTERVAL_MS = 300000L; private static final int DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE = 1; @@ -71,14 +72,16 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis private static final ObjectMapper objectMapper = new ObjectMapper(); private final JsonFactory jsonFactory = new JsonFactory(); private Map offsetsToCommit; + private Map ownedPartitionsEpoch; private Set partitionsToReset; private final AcknowledgementSetManager acknowledgementSetManager; private final Map partitionCommitTrackerMap; - private List>> acknowledgedOffsets; + private List> acknowledgedOffsets; private final boolean acknowledgementsEnabled; private final Duration acknowledgementsTimeout; private final KafkaTopicMetrics topicMetrics; private long metricsUpdatedTime; + private final AtomicInteger numberOfAcksPending; public KafkaSourceCustomConsumer(final KafkaConsumer consumer, final AtomicBoolean shutdownInProgress, @@ -96,11 +99,12 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer, this.topicMetrics = topicMetrics; this.topicMetrics.register(consumer); this.offsetsToCommit = new HashMap<>(); + this.ownedPartitionsEpoch = new HashMap<>(); this.metricsUpdatedTime = Instant.now().getEpochSecond(); this.acknowledgedOffsets = new ArrayList<>(); - this.acknowledgementsTimeout = sourceConfig.getAcknowledgementsTimeout(); + this.acknowledgementsTimeout = Duration.ofSeconds(Integer.MAX_VALUE); // If the timeout value is different from default value, then enable acknowledgements automatically. - this.acknowledgementsEnabled = sourceConfig.getAcknowledgementsEnabled() || acknowledgementsTimeout != KafkaSourceConfig.DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT; + this.acknowledgementsEnabled = sourceConfig.getAcknowledgementsEnabled(); this.acknowledgementSetManager = acknowledgementSetManager; this.partitionCommitTrackerMap = new HashMap<>(); this.partitionsToReset = Collections.synchronizedSet(new HashSet<>()); @@ -108,6 +112,12 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer, Duration bufferTimeout = Duration.ofSeconds(1); this.bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE, bufferTimeout); this.lastCommitTime = System.currentTimeMillis(); + this.numberOfAcksPending = new AtomicInteger(0); + } + + private long getCurrentTimeNanos() { + Instant now = Instant.now(); + return now.getEpochSecond()*1000000000+now.getNano(); } public void updateOffsetsToCommit(final TopicPartition partition, final OffsetAndMetadata offsetAndMetadata, Range offsetRange) { @@ -122,21 +132,24 @@ public void updateOffsetsToCommit(final TopicPartition partition, final OffsetAn } } - private AcknowledgementSet createAcknowledgementSet(Map> offsets) { + private AcknowledgementSet createAcknowledgementSet(Map offsets) { AcknowledgementSet acknowledgementSet = - acknowledgementSetManager.create((result) -> { - if (result == true) { - topicMetrics.getNumberOfPositiveAcknowledgements().increment(); - synchronized(acknowledgedOffsets) { - acknowledgedOffsets.add(offsets); + acknowledgementSetManager.create((result) -> { + numberOfAcksPending.decrementAndGet(); + if (result == true) { + topicMetrics.getNumberOfPositiveAcknowledgements().increment(); + synchronized(this) { + acknowledgedOffsets.add(offsets); + } + } else { + topicMetrics.getNumberOfNegativeAcknowledgements().increment(); + synchronized(this) { + offsets.forEach((partition, offsetRange) -> { + partitionsToReset.add(partition); + }); + } } - } else { - topicMetrics.getNumberOfNegativeAcknowledgements().increment(); - offsets.forEach((partition, offsetRange) -> { - partitionsToReset.add(partition); - }); - } - }, acknowledgementsTimeout); + }, acknowledgementsTimeout); return acknowledgementSet; } @@ -144,7 +157,7 @@ public void consumeRecords() throws Exception { try { ConsumerRecords records = consumer.poll(topicConfig.getThreadWaitingTime().toMillis()/2); if (Objects.nonNull(records) && !records.isEmpty() && records.count() > 0) { - Map> offsets = new HashMap<>(); + Map offsets = new HashMap<>(); AcknowledgementSet acknowledgementSet = null; if (acknowledgementsEnabled) { acknowledgementSet = createAcknowledgementSet(offsets); @@ -152,9 +165,12 @@ public void consumeRecords() throws Exception { iterateRecordPartitions(records, acknowledgementSet, offsets); if (!acknowledgementsEnabled) { offsets.forEach((partition, offsetRange) -> - updateOffsetsToCommit(partition, new OffsetAndMetadata(offsetRange.getMaximum() + 1), offsetRange)); + updateOffsetsToCommit(partition, + new OffsetAndMetadata(offsetRange.getOffsets().getMaximum() + 1), + offsetRange.getOffsets())); } else { acknowledgementSet.complete(); + numberOfAcksPending.incrementAndGet(); } } } catch (AuthenticationException e) { @@ -163,7 +179,7 @@ public void consumeRecords() throws Exception { Thread.sleep(10000); } catch (RecordDeserializationException e) { LOG.warn("Deserialization error - topic {} partition {} offset {}", - e.topicPartition().topic(), e.topicPartition().partition(), e.offset()); + e.topicPartition().topic(), e.topicPartition().partition(), e.offset()); if (e.getCause() instanceof AWSSchemaRegistryException) { LOG.warn("AWSSchemaRegistryException: {}. Retrying after 30 seconds", e.getMessage()); Thread.sleep(30000); @@ -183,8 +199,10 @@ private void resetOffsets() { if (Objects.isNull(offsetAndMetadata)) { consumer.seek(partition, 0L); } else { + LOG.info("Seeking partition {} to {}", partition, offsetAndMetadata.offset()); consumer.seek(partition, offsetAndMetadata); } + partitionCommitTrackerMap.remove(partition.partition()); } catch (Exception e) { LOG.error("Failed to seek to last committed offset upon negative acknowledgement {}", partition, e); } @@ -194,34 +212,39 @@ private void resetOffsets() { } void processAcknowledgedOffsets() { - synchronized(acknowledgedOffsets) { - acknowledgedOffsets.forEach(offsets -> { - offsets.forEach((partition, offsetRange) -> { + + acknowledgedOffsets.forEach(offsets -> { + offsets.forEach((partition, offsetRange) -> { + + if (getPartitionEpoch(partition) == offsetRange.getEpoch()) { try { int partitionId = partition.partition(); if (!partitionCommitTrackerMap.containsKey(partitionId)) { OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(partition); Long committedOffset = Objects.nonNull(committedOffsetAndMetadata) ? committedOffsetAndMetadata.offset() : null; + LOG.info("Tracking offsets for partition{} starting with committedOffset {}", partition, + committedOffset); partitionCommitTrackerMap.put(partitionId, new TopicPartitionCommitTracker(partition, committedOffset)); } - OffsetAndMetadata offsetAndMetadata = partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange); - updateOffsetsToCommit(partition, offsetAndMetadata, offsetRange); + OffsetAndMetadata offsetAndMetadata = + partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange.getOffsets()); + updateOffsetsToCommit(partition, offsetAndMetadata, offsetRange.getOffsets()); } catch (Exception e) { LOG.error("Failed committed offsets upon positive acknowledgement {}", partition, e); } - }); + } }); - acknowledgedOffsets.clear(); - } + }); + acknowledgedOffsets.clear(); } - private void commitOffsets() { + private void commitOffsets(boolean forceCommit) { if (topicConfig.getAutoCommit()) { return; } processAcknowledgedOffsets(); long currentTimeMillis = System.currentTimeMillis(); - if ((currentTimeMillis - lastCommitTime) < topicConfig.getCommitInterval().toMillis()) { + if (!forceCommit && (currentTimeMillis - lastCommitTime) < topicConfig.getCommitInterval().toMillis()) { return; } synchronized (offsetsToCommit) { @@ -229,12 +252,12 @@ private void commitOffsets() { return; } try { - consumer.commitSync(); - offsetsToCommit.clear(); - lastCommitTime = currentTimeMillis; - } catch (CommitFailedException e) { + consumer.commitSync(offsetsToCommit); + } catch (Exception e) { LOG.error("Failed to commit offsets in topic {}", topicName, e); } + offsetsToCommit.clear(); + lastCommitTime = currentTimeMillis; } } @@ -244,15 +267,27 @@ Map getOffsetsToCommit() { @Override public void run() { - consumer.subscribe(Arrays.asList(topicName)); + consumer.subscribe(Arrays.asList(topicName), this); + Set partitions = consumer.assignment(); + synchronized (ownedPartitionsEpoch) { + final long currentEpoch = getCurrentTimeNanos(); + partitions.forEach((partition) -> { + final OffsetAndMetadata offsetAndMetadata = consumer.committed(partition); + LOG.info("Starting consumer with topic partition ({}) offset {}", partition, offsetAndMetadata); + ownedPartitionsEpoch.put(partition, currentEpoch); + }); + } + boolean retryingAfterException = false; while (!shutdownInProgress.get()) { try { if (retryingAfterException) { Thread.sleep(10000); } - resetOffsets(); - commitOffsets(); + synchronized(this) { + commitOffsets(false); + resetOffsets(); + } consumeRecords(); topicMetrics.update(consumer); retryingAfterException = false; @@ -261,6 +296,10 @@ public void run() { retryingAfterException = true; } } + LOG.info("Shutting down, number of acks pending = {}", numberOfAcksPending.get()); + synchronized(this) { + commitOffsets(true); + } } private Record getRecord(ConsumerRecord consumerRecord, int partition) { @@ -314,9 +353,15 @@ private Record getRecord(ConsumerRecord consumerRecord, in return new Record(event); } - private void iterateRecordPartitions(ConsumerRecords records, final AcknowledgementSet acknowledgementSet, Map> offsets) throws Exception { + private void iterateRecordPartitions(ConsumerRecords records, final AcknowledgementSet acknowledgementSet, + Map offsets) throws Exception { for (TopicPartition topicPartition : records.partitions()) { - List> kafkaRecords = new ArrayList<>(); + final long partitionEpoch = getPartitionEpoch(topicPartition); + if (partitionEpoch == 0) { + LOG.info("Skipping partition {}, lost ownership", topicPartition); + continue; + } + List> partitionRecords = records.records(topicPartition); for (ConsumerRecord consumerRecord : partitionRecords) { Record record = getRecord(consumerRecord, topicPartition.partition()); @@ -339,10 +384,11 @@ private void iterateRecordPartitions(ConsumerRecords records, fin } } } + long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); long firstOffset = partitionRecords.get(0).offset(); Range offsetRange = Range.between(firstOffset, lastOffset); - offsets.put(topicPartition, offsetRange); + offsets.put(topicPartition, new CommitOffsetRange(offsetRange, partitionEpoch)); } } @@ -353,16 +399,43 @@ public void closeConsumer(){ public void shutdownConsumer(){ consumer.wakeup(); } + @Override public void onPartitionsAssigned(Collection partitions) { - for (TopicPartition topicPartition : partitions) { - synchronized(partitionsToReset) { - partitionsToReset.add(topicPartition); + synchronized(this) { + final long epoch = getCurrentTimeNanos(); + for (TopicPartition topicPartition : partitions) { + if (ownedPartitionsEpoch.containsKey(topicPartition)) { + LOG.info("Partition {} already owned", topicPartition); + continue; + } + final OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition); + LOG.info("Assigned new partition {}, committed offset: {}",topicPartition, + Objects.isNull(offsetAndMetadata) ? 0 : offsetAndMetadata.offset()); + + partitionCommitTrackerMap.remove(topicPartition.partition()); + ownedPartitionsEpoch.put(topicPartition, epoch); } } } @Override public void onPartitionsRevoked(Collection partitions) { + synchronized(this) { + commitOffsets(true); + for (TopicPartition topicPartition : partitions) { + if (!ownedPartitionsEpoch.containsKey(topicPartition)) { + LOG.info("Partition {} not owned ", topicPartition); + continue; + } + LOG.info("Revoked partition {}", topicPartition); + ownedPartitionsEpoch.remove(topicPartition); + partitionCommitTrackerMap.remove(topicPartition.partition()); + } + } + } + + private long getPartitionEpoch(final TopicPartition topicPartition) { + return ownedPartitionsEpoch.getOrDefault(topicPartition, 0L); } } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index ea021f8474..7bcd296821 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -5,34 +5,34 @@ package org.opensearch.dataprepper.plugins.kafka.source; +import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; import io.confluent.kafka.serializers.KafkaJsonDeserializer; import kafka.common.BrokerEndPointNotAvailableException; import org.apache.avro.generic.GenericRecord; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.BrokerNotAvailableException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.configuration.PipelineDescription; -import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.Source; -import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; - import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.OAuthConfig; @@ -44,11 +44,8 @@ import org.opensearch.dataprepper.plugins.kafka.util.ClientDNSLookupType; import org.opensearch.dataprepper.plugins.kafka.util.KafkaSourceJsonDeserializer; import org.opensearch.dataprepper.plugins.kafka.util.KafkaSourceSecurityConfigurer; -import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics; - -import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer; - +import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,25 +54,26 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.net.HttpURLConnection; +import java.net.InetAddress; +import java.net.Socket; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; -import java.net.InetAddress; -import java.net.Socket; -import java.util.Map; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; -import java.util.Comparator; import java.util.Properties; -import java.util.Arrays; -import java.util.ArrayList; -import java.util.concurrent.Executors; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.stream.IntStream; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.IntStream; /** * The starting point of the Kafka-source plugin and the Kafka consumer @@ -100,6 +98,8 @@ public class KafkaSource implements Source> { private static CachedSchemaRegistryClient schemaRegistryClient; private GlueSchemaRegistryKafkaDeserializer glueDeserializer; private StringDeserializer stringDeserializer; + private final Map topicExecutorService; + private final Map topicConsumer; @DataPrepperPluginConstructor public KafkaSource(final KafkaSourceConfig sourceConfig, @@ -112,6 +112,8 @@ public KafkaSource(final KafkaSourceConfig sourceConfig, this.pipelineName = pipelineDescription.getPipelineName(); this.stringDeserializer = new StringDeserializer(); shutdownInProgress = new AtomicBoolean(false); + topicExecutorService = new HashMap<>(); + topicConsumer = new HashMap<>(); } @Override @@ -146,6 +148,9 @@ public void start(Buffer> buffer) { } consumer = new KafkaSourceCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType, acknowledgementSetManager, topicMetrics); executorService.submit(consumer); + + topicExecutorService.put(topic, executorService); + topicConsumer.put(topic, consumer); }); } catch (Exception e) { if (e instanceof BrokerNotAvailableException || @@ -162,8 +167,15 @@ public void start(Buffer> buffer) { @Override public void stop() { - LOG.info("Shutting down Consumers..."); shutdownInProgress.set(true); + LOG.info("Shutting down Consumers..."); + sourceConfig.getTopics().forEach(topic -> { + stopConsumer(topicExecutorService.get(topic), topicConsumer.get(topic)); + }); + LOG.info("Consumer shutdown successfully..."); + } + + public void stopConsumer(final ExecutorService executorService, final KafkaSourceCustomConsumer consumer) { executorService.shutdown(); try { if (!executorService.awaitTermination( @@ -178,7 +190,7 @@ public void stop() { Thread.currentThread().interrupt(); } } - LOG.info("Consumer shutdown successfully..."); + consumer.closeConsumer(); } private long calculateLongestThreadWaitingTime() { @@ -270,7 +282,6 @@ private static String getSchemaType(final String registryUrl, final String topic // the schemaType to PLAINTEXT LOG.error("GET request failed while fetching the schema registry. Defaulting to schema type PLAINTEXT"); return MessageFormat.PLAINTEXT.toString(); - } } catch (IOException e) { LOG.error("An error while fetching the schema registry details : ", e); @@ -472,4 +483,4 @@ private String getMaskedBootStrapDetails(String serverIP) { } return maskedString.append(serverIP.substring(maskedLength)).toString(); } -} +} \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfigTest.java index 55835f0da9..2da916d79f 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfigTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfigTest.java @@ -11,12 +11,11 @@ import java.io.IOException; import java.io.Reader; import java.io.StringReader; -import java.util.List; -import java.util.Collections; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Map; -import java.time.Duration; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.notNullValue; @@ -66,7 +65,6 @@ void test_bootStrapServers_not_null(){ @Test void test_topics_not_null(){ assertEquals(false, kafkaSourceConfig.getAcknowledgementsEnabled()); - assertEquals(KafkaSourceConfig.DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT, kafkaSourceConfig.getAcknowledgementsTimeout()); assertThat(kafkaSourceConfig.getTopics(), notNullValue()); } @@ -81,10 +79,7 @@ void test_setters() throws NoSuchFieldException, IllegalAccessException { assertEquals("127.0.0.1:9092", kafkaSourceConfig.getBootStrapServers()); assertEquals(Collections.singletonList(topicConfig), kafkaSourceConfig.getTopics()); setField(KafkaSourceConfig.class, kafkaSourceConfig, "acknowledgementsEnabled", true); - Duration testTimeout = Duration.ofSeconds(10); - setField(KafkaSourceConfig.class, kafkaSourceConfig, "acknowledgementsTimeout", testTimeout); assertEquals(true, kafkaSourceConfig.getAcknowledgementsEnabled()); - assertEquals(testTimeout, kafkaSourceConfig.getAcknowledgementsTimeout()); assertEquals(EncryptionType.SSL, kafkaSourceConfig.getEncryptionConfig().getType()); setField(KafkaSourceConfig.EncryptionConfig.class, encryptionConfig, "type", EncryptionType.NONE); assertEquals(EncryptionType.NONE, encryptionConfig.getType()); diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java index 47080515d3..5eec03f260 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java @@ -5,55 +5,55 @@ package org.opensearch.dataprepper.plugins.kafka.consumer; -import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.micrometer.core.instrument.Counter; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import org.opensearch.dataprepper.acknowledgements.DefaultAcknowledgementSetManager; import org.opensearch.dataprepper.model.CheckpointState; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.configuration.PluginSetting; -import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaKeyMode; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaKeyMode; -import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics; -import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; -import org.opensearch.dataprepper.acknowledgements.DefaultAcknowledgementSetManager; -import io.micrometer.core.instrument.Counter; - -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.any; -import static org.hamcrest.Matchers.hasEntry; -import static org.hamcrest.MatcherAssert.assertThat; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.junit.jupiter.MockitoExtension; -import org.mockito.junit.jupiter.MockitoSettings; -import org.mockito.quality.Strictness; -import org.mockito.Mock; +import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; -import java.util.Map; -import java.util.HashMap; -import java.util.Arrays; +import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.time.Duration; +import java.util.Map; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasEntry; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) @@ -123,7 +123,6 @@ public void setUp() { public KafkaSourceCustomConsumer createObjectUnderTest(String schemaType, boolean acknowledgementsEnabled) { when(sourceConfig.getAcknowledgementsEnabled()).thenReturn(acknowledgementsEnabled); - when(sourceConfig.getAcknowledgementsTimeout()).thenReturn(KafkaSourceConfig.DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT); return new KafkaSourceCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topicConfig, schemaType, acknowledgementSetManager, topicMetrics); } @@ -144,6 +143,7 @@ public void testPlainTextConsumeRecords() throws InterruptedException { consumer = createObjectUnderTest("plaintext", false); try { + consumer.onPartitionsAssigned(List.of(new TopicPartition(topic, testPartition))); consumer.consumeRecords(); } catch (Exception e){} final Map.Entry>, CheckpointState> bufferRecords = buffer.read(1000); @@ -179,6 +179,7 @@ public void testPlainTextConsumeRecordsWithAcknowledgements() throws Interrupted consumer = createObjectUnderTest("plaintext", true); try { + consumer.onPartitionsAssigned(List.of(new TopicPartition(topic, testPartition))); consumer.consumeRecords(); } catch (Exception e){} final Map.Entry>, CheckpointState> bufferRecords = buffer.read(1000); @@ -223,6 +224,7 @@ public void testPlainTextConsumeRecordsWithNegativeAcknowledgements() throws Int consumer = createObjectUnderTest("plaintext", true); try { + consumer.onPartitionsAssigned(List.of(new TopicPartition(topic, testPartition))); consumer.consumeRecords(); } catch (Exception e){} final Map.Entry>, CheckpointState> bufferRecords = buffer.read(1000); @@ -263,6 +265,7 @@ public void testJsonConsumeRecords() throws InterruptedException, Exception { when(kafkaConsumer.poll(anyLong())).thenReturn(consumerRecords); consumer = createObjectUnderTest("json", false); + consumer.onPartitionsAssigned(List.of(new TopicPartition(topic, testJsonPartition))); consumer.consumeRecords(); final Map.Entry>, CheckpointState> bufferRecords = buffer.read(1000); ArrayList> bufferedRecords = new ArrayList<>(bufferRecords.getKey());