Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make max partition fetch bytes configurable #3115

Merged
merged 1 commit into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class TopicConfig {
static final Integer DEFAULT_FETCH_MIN_BYTES = 1;
static final Duration DEFAULT_RETRY_BACKOFF = Duration.ofSeconds(10);
static final Duration DEFAULT_RECONNECT_BACKOFF = Duration.ofSeconds(10);
static final Integer DEFAULT_MAX_PARTITION_FETCH_BYTES = 1048576;
static final Duration DEFAULT_MAX_POLL_INTERVAL = Duration.ofSeconds(300000);
static final Integer DEFAULT_CONSUMER_MAX_POLL_RECORDS = 500;
static final Integer DEFAULT_NUM_OF_WORKERS = 2;
Expand Down Expand Up @@ -92,6 +93,9 @@ public class TopicConfig {
@JsonProperty("max_record_fetch_time")
private Duration maxRecordFetchTime = DEFAULT_MAX_RECORD_FETCH_TIME;

@JsonProperty("max_partition_fetch_bytes")
private Integer maxPartitionFetchBytes = DEFAULT_MAX_PARTITION_FETCH_BYTES;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make use of Data Prepper's ByteCount type. See #3099 for an example of using this in the CloudWatch logs config.


@JsonProperty("buffer_default_timeout")
@Valid
@Size(min = 1)
Expand Down Expand Up @@ -192,6 +196,10 @@ public Duration getMaxRecordFetchTime() {
return maxRecordFetchTime;
}

public Integer getMaxPartitionFetchBytes() {
return maxPartitionFetchBytes;
}

public void setMaxRecordFetchTime(Duration maxRecordFetchTime) {
this.maxRecordFetchTime = maxRecordFetchTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.micrometer.core.instrument.Counter;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.AdminClient;
Expand Down Expand Up @@ -87,12 +86,10 @@
@SuppressWarnings("deprecation")
@DataPrepperPlugin(name = "kafka", pluginType = Source.class, pluginConfigurationType = KafkaSourceConfig.class)
public class KafkaSource implements Source<Record<Event>> {
private static final String KAFKA_WORKER_THREAD_PROCESSING_ERRORS = "kafkaWorkerThreadProcessingErrors";
private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
private final KafkaSourceConfig sourceConfig;
private AtomicBoolean shutdownInProgress;
private ExecutorService executorService;
private final Counter kafkaWorkerThreadProcessingErrors;
private final PluginMetrics pluginMetrics;
private KafkaSourceCustomConsumer consumer;
private KafkaConsumer kafkaConsumer;
Expand All @@ -112,15 +109,16 @@ public KafkaSource(final KafkaSourceConfig sourceConfig,
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;
this.pipelineName = pipelineDescription.getPipelineName();
this.kafkaWorkerThreadProcessingErrors = pluginMetrics.counter(KAFKA_WORKER_THREAD_PROCESSING_ERRORS);
shutdownInProgress = new AtomicBoolean(false);
}

@Override
public void start(Buffer<Record<Event>> buffer) {
Properties authProperties = new Properties();
KafkaSourceSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG);
sourceConfig.getTopics().forEach(topic -> {
consumerGroupID = topic.getGroupId();
Properties consumerProperties = getConsumerProperties(topic);
Properties consumerProperties = getConsumerProperties(topic, authProperties);
MessageFormat schema = MessageFormat.getByMessageFormatByName(schemaType);
try {
int numWorkers = topic.getWorkers();
Expand Down Expand Up @@ -189,12 +187,8 @@ KafkaConsumer getConsumer() {
return kafkaConsumer;
}

private Properties getConsumerProperties(final TopicConfig topicConfig) {
Properties properties = new Properties();
KafkaSourceSecurityConfigurer.setAuthProperties(properties, sourceConfig, LOG);
/* if (isKafkaClusterExists(sourceConfig.getBootStrapServers())) {
throw new RuntimeException("Can't be able to connect to the given Kafka brokers... ");
}*/
private Properties getConsumerProperties(final TopicConfig topicConfig, final Properties authProperties) {
Properties properties = (Properties)authProperties.clone();
if (StringUtils.isNotEmpty(sourceConfig.getClientDnsLookup())) {
ClientDNSLookupType dnsLookupType = ClientDNSLookupType.getDnsLookupType(sourceConfig.getClientDnsLookup());
switch (dnsLookupType) {
Expand Down Expand Up @@ -364,6 +358,7 @@ private void setPropertiesForSchemaType(Properties properties, TopicConfig topic

private void setConsumerTopicProperties(Properties properties, TopicConfig topicConfig) {
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupID);
properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, topicConfig.getMaxPartitionFetchBytes());
properties.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, ((Long)topicConfig.getRetryBackoff().toMillis()).intValue());
properties.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, ((Long)topicConfig.getReconnectBackoff().toMillis()).intValue());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
Expand All @@ -374,10 +369,13 @@ private void setConsumerTopicProperties(Properties properties, TopicConfig topic
topicConfig.getAutoOffsetReset());
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
topicConfig.getConsumerMaxPollRecords());
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
((Long)topicConfig.getMaxPollInterval().toMillis()).intValue());
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ((Long)topicConfig.getSessionTimeOut().toMillis()).intValue());
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, ((Long)topicConfig.getHeartBeatInterval().toMillis()).intValue());
properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, topicConfig.getFetchMaxBytes().intValue());
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, topicConfig.getFetchMaxWait());
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, topicConfig.getFetchMinBytes());
}

private void setPropertiesForSchemaRegistryConnectivity(Properties properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ void testConfigValues_default() {
assertEquals(TopicConfig.DEFAULT_CONSUMER_MAX_POLL_RECORDS, topicConfig.getConsumerMaxPollRecords());
assertEquals(TopicConfig.DEFAULT_NUM_OF_WORKERS, topicConfig.getWorkers());
assertEquals(TopicConfig.DEFAULT_HEART_BEAT_INTERVAL_DURATION, topicConfig.getHeartBeatInterval());
assertEquals(TopicConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES, topicConfig.getMaxPartitionFetchBytes());
}

@Test
Expand All @@ -105,6 +106,7 @@ void testConfigValues_from_yaml() {
assertEquals(500L, topicConfig.getConsumerMaxPollRecords().longValue());
assertEquals(5, topicConfig.getWorkers().intValue());
assertEquals(Duration.ofSeconds(3), topicConfig.getHeartBeatInterval());
assertEquals(10*TopicConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES, topicConfig.getMaxPartitionFetchBytes());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ log-pipeline:
fetch_min_bytes: 1
retry_backoff: PT100S
consumer_max_poll_records: 500
max_partition_fetch_bytes: 10485760
schema:
registry_url: http://localhost:8081/
version: 1
Expand Down
Loading