Skip to content

Commit

Permalink
Fix code to set max poll interval and fetch min bytes config (#3115)
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka authored Aug 4, 2023
1 parent bdbb174 commit c75ed7b
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class TopicConfig {
static final Integer DEFAULT_FETCH_MIN_BYTES = 1;
static final Duration DEFAULT_RETRY_BACKOFF = Duration.ofSeconds(10);
static final Duration DEFAULT_RECONNECT_BACKOFF = Duration.ofSeconds(10);
static final Integer DEFAULT_MAX_PARTITION_FETCH_BYTES = 1048576;
static final Duration DEFAULT_MAX_POLL_INTERVAL = Duration.ofSeconds(300000);
static final Integer DEFAULT_CONSUMER_MAX_POLL_RECORDS = 500;
static final Integer DEFAULT_NUM_OF_WORKERS = 2;
Expand Down Expand Up @@ -92,6 +93,9 @@ public class TopicConfig {
@JsonProperty("max_record_fetch_time")
private Duration maxRecordFetchTime = DEFAULT_MAX_RECORD_FETCH_TIME;

@JsonProperty("max_partition_fetch_bytes")
private Integer maxPartitionFetchBytes = DEFAULT_MAX_PARTITION_FETCH_BYTES;

@JsonProperty("buffer_default_timeout")
@Valid
@Size(min = 1)
Expand Down Expand Up @@ -192,6 +196,10 @@ public Duration getMaxRecordFetchTime() {
return maxRecordFetchTime;
}

public Integer getMaxPartitionFetchBytes() {
return maxPartitionFetchBytes;
}

public void setMaxRecordFetchTime(Duration maxRecordFetchTime) {
this.maxRecordFetchTime = maxRecordFetchTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.micrometer.core.instrument.Counter;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.AdminClient;
Expand Down Expand Up @@ -87,12 +86,10 @@
@SuppressWarnings("deprecation")
@DataPrepperPlugin(name = "kafka", pluginType = Source.class, pluginConfigurationType = KafkaSourceConfig.class)
public class KafkaSource implements Source<Record<Event>> {
private static final String KAFKA_WORKER_THREAD_PROCESSING_ERRORS = "kafkaWorkerThreadProcessingErrors";
private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
private final KafkaSourceConfig sourceConfig;
private AtomicBoolean shutdownInProgress;
private ExecutorService executorService;
private final Counter kafkaWorkerThreadProcessingErrors;
private final PluginMetrics pluginMetrics;
private KafkaSourceCustomConsumer consumer;
private KafkaConsumer kafkaConsumer;
Expand All @@ -112,15 +109,16 @@ public KafkaSource(final KafkaSourceConfig sourceConfig,
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;
this.pipelineName = pipelineDescription.getPipelineName();
this.kafkaWorkerThreadProcessingErrors = pluginMetrics.counter(KAFKA_WORKER_THREAD_PROCESSING_ERRORS);
shutdownInProgress = new AtomicBoolean(false);
}

@Override
public void start(Buffer<Record<Event>> buffer) {
Properties authProperties = new Properties();
KafkaSourceSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG);
sourceConfig.getTopics().forEach(topic -> {
consumerGroupID = topic.getGroupId();
Properties consumerProperties = getConsumerProperties(topic);
Properties consumerProperties = getConsumerProperties(topic, authProperties);
MessageFormat schema = MessageFormat.getByMessageFormatByName(schemaType);
try {
int numWorkers = topic.getWorkers();
Expand Down Expand Up @@ -189,12 +187,8 @@ KafkaConsumer getConsumer() {
return kafkaConsumer;
}

private Properties getConsumerProperties(final TopicConfig topicConfig) {
Properties properties = new Properties();
KafkaSourceSecurityConfigurer.setAuthProperties(properties, sourceConfig, LOG);
/* if (isKafkaClusterExists(sourceConfig.getBootStrapServers())) {
throw new RuntimeException("Can't be able to connect to the given Kafka brokers... ");
}*/
private Properties getConsumerProperties(final TopicConfig topicConfig, final Properties authProperties) {
Properties properties = (Properties)authProperties.clone();
if (StringUtils.isNotEmpty(sourceConfig.getClientDnsLookup())) {
ClientDNSLookupType dnsLookupType = ClientDNSLookupType.getDnsLookupType(sourceConfig.getClientDnsLookup());
switch (dnsLookupType) {
Expand Down Expand Up @@ -364,6 +358,7 @@ private void setPropertiesForSchemaType(Properties properties, TopicConfig topic

private void setConsumerTopicProperties(Properties properties, TopicConfig topicConfig) {
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupID);
properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, topicConfig.getMaxPartitionFetchBytes());
properties.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, ((Long)topicConfig.getRetryBackoff().toMillis()).intValue());
properties.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, ((Long)topicConfig.getReconnectBackoff().toMillis()).intValue());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
Expand All @@ -374,10 +369,13 @@ private void setConsumerTopicProperties(Properties properties, TopicConfig topic
topicConfig.getAutoOffsetReset());
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
topicConfig.getConsumerMaxPollRecords());
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
((Long)topicConfig.getMaxPollInterval().toMillis()).intValue());
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ((Long)topicConfig.getSessionTimeOut().toMillis()).intValue());
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, ((Long)topicConfig.getHeartBeatInterval().toMillis()).intValue());
properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, topicConfig.getFetchMaxBytes().intValue());
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, topicConfig.getFetchMaxWait());
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, topicConfig.getFetchMinBytes());
}

private void setPropertiesForSchemaRegistryConnectivity(Properties properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ void testConfigValues_default() {
assertEquals(TopicConfig.DEFAULT_CONSUMER_MAX_POLL_RECORDS, topicConfig.getConsumerMaxPollRecords());
assertEquals(TopicConfig.DEFAULT_NUM_OF_WORKERS, topicConfig.getWorkers());
assertEquals(TopicConfig.DEFAULT_HEART_BEAT_INTERVAL_DURATION, topicConfig.getHeartBeatInterval());
assertEquals(TopicConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES, topicConfig.getMaxPartitionFetchBytes());
}

@Test
Expand All @@ -105,6 +106,7 @@ void testConfigValues_from_yaml() {
assertEquals(500L, topicConfig.getConsumerMaxPollRecords().longValue());
assertEquals(5, topicConfig.getWorkers().intValue());
assertEquals(Duration.ofSeconds(3), topicConfig.getHeartBeatInterval());
assertEquals(10*TopicConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES, topicConfig.getMaxPartitionFetchBytes());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ log-pipeline:
fetch_min_bytes: 1
retry_backoff: PT100S
consumer_max_poll_records: 500
max_partition_fetch_bytes: 10485760
schema:
registry_url: http://localhost:8081/
version: 1
Expand Down

0 comments on commit c75ed7b

Please sign in to comment.