From b739d18a5a5ffbc2f8c5b485aa31be304ea1280b Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Tue, 1 Aug 2023 23:30:51 +0000 Subject: [PATCH 1/3] Fix consumer synchronization. Fix consumer to use user-specified groupId Signed-off-by: Krishna Kondaka --- .../kafka/source/KafkaSourceJsonTypeIT.java | 4 +- .../kafka/configuration/TopicConfig.java | 64 +++++++++++-------- .../consumer/KafkaSourceCustomConsumer.java | 58 +++++++++-------- .../plugins/kafka/source/KafkaSource.java | 7 +- .../util/KafkaSourceSecurityConfigurer.java | 10 +-- .../kafka/configuration/TopicConfigTest.java | 30 +++++---- .../test/resources/sample-pipelines-1.yaml | 6 +- .../src/test/resources/sample-pipelines.yaml | 1 + 8 files changed, 100 insertions(+), 80 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java index 6e7f612534..584ee24eb5 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java @@ -83,6 +83,7 @@ public class KafkaSourceJsonTypeIT { private String bootstrapServers; private String testKey; private String testTopic; + private String testGroup; public KafkaSource createObjectUnderTest() { return new KafkaSource(sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription); @@ -112,7 +113,7 @@ public void setup() { } catch (Exception e){} testKey = RandomStringUtils.randomAlphabetic(5); - final String testGroup = "TestGroup_"+RandomStringUtils.randomAlphabetic(6); + testGroup = "TestGroup_"+RandomStringUtils.randomAlphabetic(6); testTopic = "TestJsonTopic_"+RandomStringUtils.randomAlphabetic(5); jsonTopic = mock(TopicConfig.class); when(jsonTopic.getName()).thenReturn(testTopic); @@ -337,6 +338,7 @@ public void TestJsonRecordsWithKafkaKeyModeAsMetadata() throws Exception { Thread.sleep(1000); } kafkaSource.start(buffer); + assertThat(kafkaSource.getConsumer().groupMetadata().groupId(), equalTo(testGroup)); produceJsonRecords(bootstrapServers, topicName, numRecords); int numRetries = 0; while (numRetries++ < 10 && (receivedRecords.size() != numRecords)) { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java index 2f3ab61fcb..caf4f1236c 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java @@ -18,23 +18,24 @@ * pipelines.yaml */ public class TopicConfig { - private static final String AUTO_COMMIT = "false"; - private static final Duration DEFAULT_COMMIT_INTERVAL = Duration.ofSeconds(5); - private static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofSeconds(45); - private static final int MAX_RETRY_ATTEMPT = Integer.MAX_VALUE; + static final boolean DEFAULT_AUTO_COMMIT = false; + static final Duration DEFAULT_COMMIT_INTERVAL = Duration.ofSeconds(5); + static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofSeconds(45); + static final int DEFAULT_MAX_RETRY_ATTEMPT = Integer.MAX_VALUE; static final String DEFAULT_AUTO_OFFSET_RESET = "latest"; - static final Duration THREAD_WAITING_TIME = Duration.ofSeconds(5); - private static final Duration MAX_RECORD_FETCH_TIME = Duration.ofSeconds(4); - private static final Duration BUFFER_DEFAULT_TIMEOUT = Duration.ofSeconds(5); - private static final Duration MAX_RETRY_DELAY = Duration.ofSeconds(1); - private static final Integer FETCH_MAX_BYTES = 52428800; - private static final Integer FETCH_MAX_WAIT = 500; - private static final Integer FETCH_MIN_BYTES = 1; - private static final Duration RETRY_BACKOFF = Duration.ofSeconds(100); - private static final Duration MAX_POLL_INTERVAL = Duration.ofSeconds(300000); - private static final Integer CONSUMER_MAX_POLL_RECORDS = 500; + static final Duration DEFAULT_THREAD_WAITING_TIME = Duration.ofSeconds(5); + static final Duration DEFAULT_MAX_RECORD_FETCH_TIME = Duration.ofSeconds(4); + static final Duration DEFAULT_BUFFER_TIMEOUT = Duration.ofSeconds(5); + static final Duration DEFAULT_MAX_RETRY_DELAY = Duration.ofSeconds(1); + static final Integer DEFAULT_FETCH_MAX_BYTES = 52428800; + static final Integer DEFAULT_FETCH_MAX_WAIT = 500; + static final Integer DEFAULT_FETCH_MIN_BYTES = 1; + static final Duration DEFAULT_RETRY_BACKOFF = Duration.ofSeconds(10); + static final Duration DEFAULT_RECONNECT_BACKOFF = Duration.ofSeconds(10); + static final Duration DEFAULT_MAX_POLL_INTERVAL = Duration.ofSeconds(300000); + static final Integer DEFAULT_CONSUMER_MAX_POLL_RECORDS = 500; static final Integer DEFAULT_NUM_OF_WORKERS = 2; - static final Duration HEART_BEAT_INTERVAL_DURATION = Duration.ofSeconds(5); + static final Duration DEFAULT_HEART_BEAT_INTERVAL_DURATION = Duration.ofSeconds(5); @JsonProperty("name") @NotNull @@ -54,18 +55,18 @@ public class TopicConfig { @JsonProperty("max_retry_attempts") @Valid @Size(min = 1, max = Integer.MAX_VALUE, message = " Max retry attempts should lies between 1 and Integer.MAX_VALUE") - private Integer maxRetryAttempts = MAX_RETRY_ATTEMPT; + private Integer maxRetryAttempts = DEFAULT_MAX_RETRY_ATTEMPT; @JsonProperty("max_retry_delay") @Valid @Size(min = 1) - private Duration maxRetryDelay = MAX_RETRY_DELAY; + private Duration maxRetryDelay = DEFAULT_MAX_RETRY_DELAY; @JsonProperty("serde_format") private MessageFormat serdeFormat= MessageFormat.PLAINTEXT; @JsonProperty("auto_commit") - private Boolean autoCommit = false; + private Boolean autoCommit = DEFAULT_AUTO_COMMIT; @JsonProperty("commit_interval") @Valid @@ -86,47 +87,50 @@ public class TopicConfig { private String groupName; @JsonProperty("thread_waiting_time") - private Duration threadWaitingTime = THREAD_WAITING_TIME; + private Duration threadWaitingTime = DEFAULT_THREAD_WAITING_TIME; @JsonProperty("max_record_fetch_time") - private Duration maxRecordFetchTime = MAX_RECORD_FETCH_TIME; + private Duration maxRecordFetchTime = DEFAULT_MAX_RECORD_FETCH_TIME; @JsonProperty("buffer_default_timeout") @Valid @Size(min = 1) - private Duration bufferDefaultTimeout = BUFFER_DEFAULT_TIMEOUT; + private Duration bufferDefaultTimeout = DEFAULT_BUFFER_TIMEOUT; @JsonProperty("fetch_max_bytes") @Valid @Size(min = 1, max = 52428800) - private Integer fetchMaxBytes = FETCH_MAX_BYTES; + private Integer fetchMaxBytes = DEFAULT_FETCH_MAX_BYTES; @JsonProperty("fetch_max_wait") @Valid @Size(min = 1) - private Integer fetchMaxWait = FETCH_MAX_WAIT; + private Integer fetchMaxWait = DEFAULT_FETCH_MAX_WAIT; @JsonProperty("fetch_min_bytes") @Size(min = 1) @Valid - private Integer fetchMinBytes = FETCH_MIN_BYTES; + private Integer fetchMinBytes = DEFAULT_FETCH_MIN_BYTES; @JsonProperty("key_mode") private KafkaKeyMode kafkaKeyMode = KafkaKeyMode.INCLUDE_AS_FIELD; @JsonProperty("retry_backoff") - private Duration retryBackoff = RETRY_BACKOFF; + private Duration retryBackoff = DEFAULT_RETRY_BACKOFF; + + @JsonProperty("reconnect_backoff") + private Duration reconnectBackoff = DEFAULT_RECONNECT_BACKOFF; @JsonProperty("max_poll_interval") - private Duration maxPollInterval = MAX_POLL_INTERVAL; + private Duration maxPollInterval = DEFAULT_MAX_POLL_INTERVAL; @JsonProperty("consumer_max_poll_records") - private Integer consumerMaxPollRecords = CONSUMER_MAX_POLL_RECORDS; + private Integer consumerMaxPollRecords = DEFAULT_CONSUMER_MAX_POLL_RECORDS; @JsonProperty("heart_beat_interval") @Valid @Size(min = 1) - private Duration heartBeatInterval= HEART_BEAT_INTERVAL_DURATION; + private Duration heartBeatInterval= DEFAULT_HEART_BEAT_INTERVAL_DURATION; public String getGroupId() { return groupId; @@ -220,6 +224,10 @@ public Duration getRetryBackoff() { return retryBackoff; } + public Duration getReconnectBackoff() { + return reconnectBackoff; + } + public void setRetryBackoff(Duration retryBackoff) { this.retryBackoff = retryBackoff; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java index 417518836b..69bf6ef738 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java @@ -15,6 +15,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.TopicPartition; import org.apache.avro.generic.GenericRecord; import org.opensearch.dataprepper.model.log.JacksonLog; @@ -40,6 +41,8 @@ import java.util.Collection; import java.util.List; import java.util.Objects; +import java.util.Set; +import java.util.HashSet; import java.util.concurrent.atomic.AtomicBoolean; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; import com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema; @@ -68,6 +71,7 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis private static final ObjectMapper objectMapper = new ObjectMapper(); private final JsonFactory jsonFactory = new JsonFactory(); private Map offsetsToCommit; + private Set partitionsToReset; private final AcknowledgementSetManager acknowledgementSetManager; private final Map partitionCommitTrackerMap; private final Counter positiveAcknowledgementSetCounter; @@ -95,6 +99,7 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer, this.acknowledgementSetManager = acknowledgementSetManager; this.pluginMetrics = pluginMetrics; this.partitionCommitTrackerMap = new HashMap<>(); + this.partitionsToReset = new HashSet<>(); this.schema = MessageFormat.getByMessageFormatByName(schemaType); Duration bufferTimeout = Duration.ofSeconds(1); this.bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE, bufferTimeout); @@ -121,29 +126,21 @@ private AcknowledgementSet createAcknowledgementSet(Map { - try { - synchronized(consumer) { - OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(partition); - consumer.seek(partition, committedOffsetAndMetadata); - } - } catch (Exception e) { - LOG.error("Failed to seek to last committed offset upon negative acknowledgement "+partition, e); + synchronized(partitionsToReset) { + partitionsToReset.add(partition); } }); } @@ -157,10 +154,7 @@ private AcknowledgementSet createAcknowledgementSet(Map void consumeRecords() throws Exception { try { - ConsumerRecords records = null; - synchronized(consumer) { - records = consumer.poll(topicConfig.getThreadWaitingTime().toMillis()/2); - } + ConsumerRecords records = consumer.poll(topicConfig.getThreadWaitingTime().toMillis()/2); if (Objects.nonNull(records) && !records.isEmpty() && records.count() > 0) { Map> offsets = new HashMap<>(); AcknowledgementSet acknowledgementSet = null; @@ -176,12 +170,27 @@ public void consumeRecords() throws Exception { } } } catch (AuthenticationException e) { - LOG.warn("Authentication Error while doing poll(). Will retry after 10 seconds", e); + LOG.warn("Access Denied while doing poll(). Will retry after 10 seconds", e); Thread.sleep(10000); + } catch (RecordDeserializationException e) { + LOG.warn("Serialization error - topic {} partition {} offset {}, seeking past the error record", + e.topicPartition().topic(), e.topicPartition().partition(), e.offset()); + consumer.seek(e.topicPartition(), e.offset()+1); } } - private void commitOffsets() { + private void resetOrCommitOffsets() { + synchronized(partitionsToReset) { + partitionsToReset.forEach(partition -> { + try { + final OffsetAndMetadata offsetAndMetadata = consumer.committed(partition); + consumer.seek(partition, offsetAndMetadata); + } catch (Exception e) { + LOG.error("Failed to seek to last committed offset upon negative acknowledgement {}", partition, e); + } + }); + partitionsToReset.clear(); + } if (topicConfig.getAutoCommit()) { return; } @@ -194,13 +203,11 @@ private void commitOffsets() { return; } try { - synchronized(consumer) { - consumer.commitSync(); - } + consumer.commitSync(); offsetsToCommit.clear(); lastCommitTime = currentTimeMillis; } catch (CommitFailedException e) { - LOG.error("Failed to commit offsets in topic "+topicName, e); + LOG.error("Failed to commit offsets in topic {}", topicName, e); } } } @@ -214,8 +221,8 @@ public void run() { try { consumer.subscribe(Arrays.asList(topicName)); while (!shutdownInProgress.get()) { + resetOrCommitOffsets(); consumeRecords(); - commitOffsets(); } } catch (Exception exp) { LOG.error("Error while reading the records from the topic...", exp); @@ -306,9 +313,8 @@ public void shutdownConsumer(){ @Override public void onPartitionsAssigned(Collection partitions) { for (TopicPartition topicPartition : partitions) { - synchronized(consumer) { - Long committedOffset = consumer.committed(topicPartition).offset(); - consumer.seek(topicPartition, committedOffset); + synchronized(partitionsToReset) { + partitionsToReset.add(topicPartition); } } } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index 45f5a87446..b39c1f9ffa 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -95,6 +95,7 @@ public class KafkaSource implements Source> { private final Counter kafkaWorkerThreadProcessingErrors; private final PluginMetrics pluginMetrics; private KafkaSourceCustomConsumer consumer; + private KafkaConsumer kafkaConsumer; private String pipelineName; private String consumerGroupID; private String schemaType = MessageFormat.PLAINTEXT.toString(); @@ -125,7 +126,6 @@ public void start(Buffer> buffer) { int numWorkers = topic.getWorkers(); executorService = Executors.newFixedThreadPool(numWorkers); IntStream.range(0, numWorkers + 1).forEach(index -> { - KafkaConsumer kafkaConsumer; switch (schema) { case JSON: kafkaConsumer = new KafkaConsumer(consumerProperties); @@ -185,6 +185,9 @@ private long calculateLongestThreadWaitingTime() { orElse(1L); } + KafkaConsumer getConsumer() { + return kafkaConsumer; + } private Properties getConsumerProperties(final TopicConfig topicConfig) { Properties properties = new Properties(); @@ -361,6 +364,8 @@ private void setPropertiesForSchemaType(Properties properties, TopicConfig topic private void setConsumerTopicProperties(Properties properties, TopicConfig topicConfig) { properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupID); + properties.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, ((Long)topicConfig.getRetryBackoff().toMillis()).intValue()); + properties.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, ((Long)topicConfig.getReconnectBackoff().toMillis()).intValue()); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, topicConfig.getAutoCommit()); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java index fd57e8bfce..19bd55d382 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java @@ -17,9 +17,7 @@ import software.amazon.awssdk.services.kafka.model.GetBootstrapBrokersRequest; import software.amazon.awssdk.services.kafka.model.GetBootstrapBrokersResponse; import software.amazon.awssdk.services.kafka.model.InternalServerErrorException; -import software.amazon.awssdk.services.kafka.model.ConflictException; -import software.amazon.awssdk.services.kafka.model.ForbiddenException; -import software.amazon.awssdk.services.kafka.model.UnauthorizedException; +import software.amazon.awssdk.services.kafka.model.KafkaException; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.services.sts.model.StsException; import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; @@ -214,17 +212,15 @@ public static String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuth retryable = false; try { result = kafkaClient.getBootstrapBrokers(request); - } catch (InternalServerErrorException | ConflictException | ForbiddenException | UnauthorizedException | StsException e) { + } catch (KafkaException | StsException e) { LOG.debug("Failed to get bootstrap server information from MSK. Retrying...", e); - - retryable = true; try { Thread.sleep(10000); } catch (InterruptedException exp) {} } catch (Exception e) { throw new RuntimeException("Failed to get bootstrap server information from MSK.", e); } - } while (retryable && numRetries++ < MAX_KAFKA_CLIENT_RETRIES); + } while (numRetries++ < MAX_KAFKA_CLIENT_RETRIES); if (Objects.isNull(result)) { throw new RuntimeException("Failed to get bootstrap server information from MSK after trying multiple times with retryable exceptions."); } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfigTest.java index db77e5cfeb..753d452095 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfigTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfigTest.java @@ -67,21 +67,23 @@ void test_topicsConfig_not_null() { @Tag(YAML_FILE_WITH_MISSING_CONSUMER_CONFIG) void testConfigValues_default() { assertEquals("my-topic-2", topicConfig.getName()); - assertEquals(false, topicConfig.getAutoCommit()); - assertEquals(Duration.ofSeconds(5), topicConfig.getCommitInterval()); - assertEquals(45000, topicConfig.getSessionTimeOut().toMillis()); + assertEquals("my-test-group", topicConfig.getGroupId()); + assertEquals(TopicConfig.DEFAULT_AUTO_COMMIT, topicConfig.getAutoCommit()); + assertEquals(TopicConfig.DEFAULT_COMMIT_INTERVAL, topicConfig.getCommitInterval()); + assertEquals(TopicConfig.DEFAULT_SESSION_TIMEOUT, topicConfig.getSessionTimeOut()); assertEquals(TopicConfig.DEFAULT_AUTO_OFFSET_RESET, topicConfig.getAutoOffsetReset()); - assertEquals(TopicConfig.THREAD_WAITING_TIME, topicConfig.getThreadWaitingTime()); - assertEquals(Duration.ofSeconds(4), topicConfig.getMaxRecordFetchTime()); - assertEquals(Duration.ofSeconds(5), topicConfig.getBufferDefaultTimeout()); - assertEquals(52428800L, topicConfig.getFetchMaxBytes().longValue()); - assertEquals(500L, topicConfig.getFetchMaxWait().longValue()); - assertEquals(1L, topicConfig.getFetchMinBytes().longValue()); - assertEquals(Duration.ofSeconds(100), topicConfig.getRetryBackoff()); - assertEquals(Duration.ofSeconds(300000), topicConfig.getMaxPollInterval()); - assertEquals(500L, topicConfig.getConsumerMaxPollRecords().longValue()); - assertEquals(TopicConfig.DEFAULT_NUM_OF_WORKERS, topicConfig.getWorkers().intValue()); - assertEquals(TopicConfig.HEART_BEAT_INTERVAL_DURATION, topicConfig.getHeartBeatInterval()); + assertEquals(TopicConfig.DEFAULT_THREAD_WAITING_TIME, topicConfig.getThreadWaitingTime()); + assertEquals(TopicConfig.DEFAULT_MAX_RECORD_FETCH_TIME, topicConfig.getMaxRecordFetchTime()); + assertEquals(TopicConfig.DEFAULT_BUFFER_TIMEOUT, topicConfig.getBufferDefaultTimeout()); + assertEquals(TopicConfig.DEFAULT_FETCH_MAX_BYTES, topicConfig.getFetchMaxBytes()); + assertEquals(TopicConfig.DEFAULT_FETCH_MAX_WAIT, topicConfig.getFetchMaxWait()); + assertEquals(TopicConfig.DEFAULT_FETCH_MIN_BYTES, topicConfig.getFetchMinBytes()); + assertEquals(TopicConfig.DEFAULT_RETRY_BACKOFF, topicConfig.getRetryBackoff()); + assertEquals(TopicConfig.DEFAULT_RECONNECT_BACKOFF, topicConfig.getReconnectBackoff()); + assertEquals(TopicConfig.DEFAULT_MAX_POLL_INTERVAL, topicConfig.getMaxPollInterval()); + assertEquals(TopicConfig.DEFAULT_CONSUMER_MAX_POLL_RECORDS, topicConfig.getConsumerMaxPollRecords()); + assertEquals(TopicConfig.DEFAULT_NUM_OF_WORKERS, topicConfig.getWorkers()); + assertEquals(TopicConfig.DEFAULT_HEART_BEAT_INTERVAL_DURATION, topicConfig.getHeartBeatInterval()); } @Test diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-1.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-1.yaml index c495902811..67f655e167 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-1.yaml +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines-1.yaml @@ -6,11 +6,11 @@ log-pipeline: topics: - name: my-topic-2 group_name: kafka-consumer-group-2 - group_id: DPKafkaProj-2 + group_id: my-test-group - name: my-topic-1 - group_id: DPKafkaProj-1 + group_id: my-test-group schema: registry_url: http://localhost:8081/ version: 1 sink: - - stdout: \ No newline at end of file + - stdout: diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml index fe3407471b..a76c38565e 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml @@ -7,6 +7,7 @@ log-pipeline: encryption: plaintext topics: - name: my-topic-1 + group_id: my-test-group workers: 5 auto_commit: false commit_interval: PT5S From 5a2a857e970a580d95975668271b7053f7e0093c Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Tue, 1 Aug 2023 23:57:44 +0000 Subject: [PATCH 2/3] Fix check style error Signed-off-by: Krishna Kondaka --- .../plugins/kafka/util/KafkaSourceSecurityConfigurer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java index 19bd55d382..245056e06e 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java @@ -16,7 +16,6 @@ import software.amazon.awssdk.services.kafka.KafkaClient; import software.amazon.awssdk.services.kafka.model.GetBootstrapBrokersRequest; import software.amazon.awssdk.services.kafka.model.GetBootstrapBrokersResponse; -import software.amazon.awssdk.services.kafka.model.InternalServerErrorException; import software.amazon.awssdk.services.kafka.model.KafkaException; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.services.sts.model.StsException; From 683a77f8e4f03cb228928853b1a3e5d9ee1083af Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Wed, 2 Aug 2023 06:03:44 +0000 Subject: [PATCH 3/3] Fixed to retry if consume records encounters an exception Signed-off-by: Krishna Kondaka --- .../kafka/consumer/KafkaSourceCustomConsumer.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java index 69bf6ef738..e1a071ffcb 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java @@ -218,14 +218,14 @@ Map getOffsetsToCommit() { @Override public void run() { - try { - consumer.subscribe(Arrays.asList(topicName)); - while (!shutdownInProgress.get()) { + consumer.subscribe(Arrays.asList(topicName)); + while (!shutdownInProgress.get()) { + try { resetOrCommitOffsets(); consumeRecords(); + } catch (Exception exp) { + LOG.error("Error while reading the records from the topic...", exp); } - } catch (Exception exp) { - LOG.error("Error while reading the records from the topic...", exp); } }