From c75ed7baacce96acc35a6baecbf79ce80f30d7e0 Mon Sep 17 00:00:00 2001 From: kkondaka <41027584+kkondaka@users.noreply.github.com> Date: Fri, 4 Aug 2023 12:50:56 -0700 Subject: [PATCH] Fix code to set max poll interval and fetch min bytes config (#3115) Signed-off-by: Krishna Kondaka Co-authored-by: Krishna Kondaka --- .../kafka/configuration/TopicConfig.java | 8 ++++++++ .../plugins/kafka/source/KafkaSource.java | 20 +++++++++---------- .../kafka/configuration/TopicConfigTest.java | 2 ++ .../src/test/resources/sample-pipelines.yaml | 1 + 4 files changed, 20 insertions(+), 11 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java index caf4f1236c..d613db3cf5 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java @@ -32,6 +32,7 @@ public class TopicConfig { static final Integer DEFAULT_FETCH_MIN_BYTES = 1; static final Duration DEFAULT_RETRY_BACKOFF = Duration.ofSeconds(10); static final Duration DEFAULT_RECONNECT_BACKOFF = Duration.ofSeconds(10); + static final Integer DEFAULT_MAX_PARTITION_FETCH_BYTES = 1048576; static final Duration DEFAULT_MAX_POLL_INTERVAL = Duration.ofSeconds(300000); static final Integer DEFAULT_CONSUMER_MAX_POLL_RECORDS = 500; static final Integer DEFAULT_NUM_OF_WORKERS = 2; @@ -92,6 +93,9 @@ public class TopicConfig { @JsonProperty("max_record_fetch_time") private Duration maxRecordFetchTime = DEFAULT_MAX_RECORD_FETCH_TIME; + @JsonProperty("max_partition_fetch_bytes") + private Integer maxPartitionFetchBytes = DEFAULT_MAX_PARTITION_FETCH_BYTES; + @JsonProperty("buffer_default_timeout") @Valid @Size(min = 1) @@ -192,6 +196,10 @@ public Duration getMaxRecordFetchTime() { return maxRecordFetchTime; } + public Integer getMaxPartitionFetchBytes() { + return maxPartitionFetchBytes; + } + public void setMaxRecordFetchTime(Duration maxRecordFetchTime) { this.maxRecordFetchTime = maxRecordFetchTime; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index b39c1f9ffa..21af54160d 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -14,7 +14,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import io.micrometer.core.instrument.Counter; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.admin.AdminClient; @@ -87,12 +86,10 @@ @SuppressWarnings("deprecation") @DataPrepperPlugin(name = "kafka", pluginType = Source.class, pluginConfigurationType = KafkaSourceConfig.class) public class KafkaSource implements Source> { - private static final String KAFKA_WORKER_THREAD_PROCESSING_ERRORS = "kafkaWorkerThreadProcessingErrors"; private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class); private final KafkaSourceConfig sourceConfig; private AtomicBoolean shutdownInProgress; private ExecutorService executorService; - private final Counter kafkaWorkerThreadProcessingErrors; private final PluginMetrics pluginMetrics; private KafkaSourceCustomConsumer consumer; private KafkaConsumer kafkaConsumer; @@ -112,15 +109,16 @@ public KafkaSource(final KafkaSourceConfig sourceConfig, this.pluginMetrics = pluginMetrics; this.acknowledgementSetManager = acknowledgementSetManager; this.pipelineName = pipelineDescription.getPipelineName(); - this.kafkaWorkerThreadProcessingErrors = pluginMetrics.counter(KAFKA_WORKER_THREAD_PROCESSING_ERRORS); shutdownInProgress = new AtomicBoolean(false); } @Override public void start(Buffer> buffer) { + Properties authProperties = new Properties(); + KafkaSourceSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG); sourceConfig.getTopics().forEach(topic -> { consumerGroupID = topic.getGroupId(); - Properties consumerProperties = getConsumerProperties(topic); + Properties consumerProperties = getConsumerProperties(topic, authProperties); MessageFormat schema = MessageFormat.getByMessageFormatByName(schemaType); try { int numWorkers = topic.getWorkers(); @@ -189,12 +187,8 @@ KafkaConsumer getConsumer() { return kafkaConsumer; } - private Properties getConsumerProperties(final TopicConfig topicConfig) { - Properties properties = new Properties(); - KafkaSourceSecurityConfigurer.setAuthProperties(properties, sourceConfig, LOG); - /* if (isKafkaClusterExists(sourceConfig.getBootStrapServers())) { - throw new RuntimeException("Can't be able to connect to the given Kafka brokers... "); - }*/ + private Properties getConsumerProperties(final TopicConfig topicConfig, final Properties authProperties) { + Properties properties = (Properties)authProperties.clone(); if (StringUtils.isNotEmpty(sourceConfig.getClientDnsLookup())) { ClientDNSLookupType dnsLookupType = ClientDNSLookupType.getDnsLookupType(sourceConfig.getClientDnsLookup()); switch (dnsLookupType) { @@ -364,6 +358,7 @@ private void setPropertiesForSchemaType(Properties properties, TopicConfig topic private void setConsumerTopicProperties(Properties properties, TopicConfig topicConfig) { properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupID); + properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, topicConfig.getMaxPartitionFetchBytes()); properties.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, ((Long)topicConfig.getRetryBackoff().toMillis()).intValue()); properties.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, ((Long)topicConfig.getReconnectBackoff().toMillis()).intValue()); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, @@ -374,10 +369,13 @@ private void setConsumerTopicProperties(Properties properties, TopicConfig topic topicConfig.getAutoOffsetReset()); properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, topicConfig.getConsumerMaxPollRecords()); + properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, + ((Long)topicConfig.getMaxPollInterval().toMillis()).intValue()); properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ((Long)topicConfig.getSessionTimeOut().toMillis()).intValue()); properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, ((Long)topicConfig.getHeartBeatInterval().toMillis()).intValue()); properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, topicConfig.getFetchMaxBytes().intValue()); properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, topicConfig.getFetchMaxWait()); + properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, topicConfig.getFetchMinBytes()); } private void setPropertiesForSchemaRegistryConnectivity(Properties properties) { diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfigTest.java index 753d452095..e244ed15b1 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfigTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfigTest.java @@ -84,6 +84,7 @@ void testConfigValues_default() { assertEquals(TopicConfig.DEFAULT_CONSUMER_MAX_POLL_RECORDS, topicConfig.getConsumerMaxPollRecords()); assertEquals(TopicConfig.DEFAULT_NUM_OF_WORKERS, topicConfig.getWorkers()); assertEquals(TopicConfig.DEFAULT_HEART_BEAT_INTERVAL_DURATION, topicConfig.getHeartBeatInterval()); + assertEquals(TopicConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES, topicConfig.getMaxPartitionFetchBytes()); } @Test @@ -105,6 +106,7 @@ void testConfigValues_from_yaml() { assertEquals(500L, topicConfig.getConsumerMaxPollRecords().longValue()); assertEquals(5, topicConfig.getWorkers().intValue()); assertEquals(Duration.ofSeconds(3), topicConfig.getHeartBeatInterval()); + assertEquals(10*TopicConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES, topicConfig.getMaxPartitionFetchBytes()); } @Test diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml index a76c38565e..e6ecc1a243 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml @@ -23,6 +23,7 @@ log-pipeline: fetch_min_bytes: 1 retry_backoff: PT100S consumer_max_poll_records: 500 + max_partition_fetch_bytes: 10485760 schema: registry_url: http://localhost:8081/ version: 1