Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor fixes to Kafka Source #3174

Merged
merged 2 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void TestJsonRecordsWithNullKey() throws Exception {
assertThat(map.get("status"), equalTo(true));
assertThat(map.get("kafka_key"), equalTo(null));
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(topicName));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo(0));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
}
try (AdminClient adminClient = AdminClient.create(props)) {
try {
Expand Down Expand Up @@ -235,7 +235,7 @@ public void TestJsonRecordsWithKafkaKeyModeDiscard() throws Exception {
assertThat(map.get("id"), equalTo(TEST_ID+i));
assertThat(map.get("status"), equalTo(true));
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(topicName));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo(0));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
}
try (AdminClient adminClient = AdminClient.create(props)) {
try {
Expand Down Expand Up @@ -295,7 +295,7 @@ public void TestJsonRecordsWithKafkaKeyModeAsField() throws Exception {
assertThat(map.get("status"), equalTo(true));
assertThat(map.get("kafka_key"), equalTo(testKey));
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(topicName));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo(0));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
}
try (AdminClient adminClient = AdminClient.create(props)) {
try {
Expand Down Expand Up @@ -356,7 +356,7 @@ public void TestJsonRecordsWithKafkaKeyModeAsMetadata() throws Exception {
assertThat(map.get("status"), equalTo(true));
assertThat(metadata.getAttributes().get("kafka_key"), equalTo(testKey));
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(topicName));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo(0));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
}
try (AdminClient adminClient = AdminClient.create(props)) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,29 @@ public boolean hasOnlyOneConfig() {
}


public static class SslAuthConfig {
// TODO Add Support for SSL authentication types like
// one-way or two-way authentication
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take it or leave it: I would recommend deleting this dead code and leave a single like comment instead of commenting it out a large code block with no description for the TODO.

* TODO
public static class SslAuthConfig {
// TODO Add Support for SSL authentication types like
// one-way or two-way authentication

public SslAuthConfig() {
public SslAuthConfig() {
}
}

@JsonProperty("ssl")
private SslAuthConfig sslAuthConfig;

public SslAuthConfig getSslAuthConfig() {
return sslAuthConfig;
}
}

@JsonProperty("ssl")
private SslAuthConfig sslAuthConfig;
*/

@Valid
@JsonProperty("sasl")
private SaslAuthConfig saslAuthConfig;

public SslAuthConfig getSslAuthConfig() {
return sslAuthConfig;
}

public SaslAuthConfig getSaslAuthConfig() {
return saslAuthConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,16 @@ public class TopicConfig {
static final boolean DEFAULT_AUTO_COMMIT = false;
static final Duration DEFAULT_COMMIT_INTERVAL = Duration.ofSeconds(5);
static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofSeconds(45);
static final int DEFAULT_MAX_RETRY_ATTEMPT = Integer.MAX_VALUE;
static final String DEFAULT_AUTO_OFFSET_RESET = "earliest";
static final Duration DEFAULT_THREAD_WAITING_TIME = Duration.ofSeconds(5);
static final Duration DEFAULT_MAX_RECORD_FETCH_TIME = Duration.ofSeconds(4);
static final Duration DEFAULT_BUFFER_TIMEOUT = Duration.ofSeconds(5);
static final Duration DEFAULT_MAX_RETRY_DELAY = Duration.ofSeconds(1);
static final Integer DEFAULT_FETCH_MAX_BYTES = 52428800;
static final Integer DEFAULT_FETCH_MAX_WAIT = 500;
static final Integer DEFAULT_FETCH_MIN_BYTES = 1;
static final Duration DEFAULT_RETRY_BACKOFF = Duration.ofSeconds(10);
static final Duration DEFAULT_RECONNECT_BACKOFF = Duration.ofSeconds(10);
static final Integer DEFAULT_MAX_PARTITION_FETCH_BYTES = 1048576;
static final Duration DEFAULT_MAX_POLL_INTERVAL = Duration.ofSeconds(300000);
static final Duration DEFAULT_MAX_POLL_INTERVAL = Duration.ofSeconds(300);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set a maximum value on maxRetryDelay? A user could set it back to 300000

static final Integer DEFAULT_CONSUMER_MAX_POLL_RECORDS = 500;
static final Integer DEFAULT_NUM_OF_WORKERS = 2;
static final Duration DEFAULT_HEART_BEAT_INTERVAL_DURATION = Duration.ofSeconds(5);
Expand All @@ -53,16 +50,6 @@ public class TopicConfig {
@Size(min = 1, max = 200, message = "Number of worker threads should lies between 1 and 200")
private Integer workers = DEFAULT_NUM_OF_WORKERS;

@JsonProperty("max_retry_attempts")
@Valid
@Size(min = 1, max = Integer.MAX_VALUE, message = " Max retry attempts should lies between 1 and Integer.MAX_VALUE")
private Integer maxRetryAttempts = DEFAULT_MAX_RETRY_ATTEMPT;

@JsonProperty("max_retry_delay")
@Valid
@Size(min = 1)
private Duration maxRetryDelay = DEFAULT_MAX_RETRY_DELAY;

@JsonProperty("serde_format")
private MessageFormat serdeFormat= MessageFormat.PLAINTEXT;

Expand All @@ -82,25 +69,12 @@ public class TopicConfig {
@JsonProperty("auto_offset_reset")
private String autoOffsetReset = DEFAULT_AUTO_OFFSET_RESET;

@JsonProperty("group_name")
@Valid
@Size(min = 1, max = 255, message = "size of group name should be between 1 and 255")
private String groupName;

@JsonProperty("thread_waiting_time")
private Duration threadWaitingTime = DEFAULT_THREAD_WAITING_TIME;

@JsonProperty("max_record_fetch_time")
private Duration maxRecordFetchTime = DEFAULT_MAX_RECORD_FETCH_TIME;

@JsonProperty("max_partition_fetch_bytes")
private Integer maxPartitionFetchBytes = DEFAULT_MAX_PARTITION_FETCH_BYTES;

@JsonProperty("buffer_default_timeout")
@Valid
@Size(min = 1)
private Duration bufferDefaultTimeout = DEFAULT_BUFFER_TIMEOUT;

@JsonProperty("fetch_max_bytes")
@Valid
@Size(min = 1, max = 52428800)
Expand Down Expand Up @@ -144,10 +118,6 @@ public void setGroupId(String groupId) {
this.groupId = groupId;
}

public void setMaxRetryAttempts(Integer maxRetryAttempts) {
this.maxRetryAttempts = maxRetryAttempts;
}

public MessageFormat getSerdeFormat() {
return serdeFormat;
}
Expand Down Expand Up @@ -176,14 +146,6 @@ public void setAutoOffsetReset(String autoOffsetReset) {
this.autoOffsetReset = autoOffsetReset;
}

public String getGroupName() {
return groupName;
}

public void setGroupName(String groupName) {
this.groupName = groupName;
}

public Duration getThreadWaitingTime() {
return threadWaitingTime;
}
Expand All @@ -192,26 +154,10 @@ public void setThreadWaitingTime(Duration threadWaitingTime) {
this.threadWaitingTime = threadWaitingTime;
}

public Duration getMaxRecordFetchTime() {
return maxRecordFetchTime;
}

public Integer getMaxPartitionFetchBytes() {
return maxPartitionFetchBytes;
}

public void setMaxRecordFetchTime(Duration maxRecordFetchTime) {
this.maxRecordFetchTime = maxRecordFetchTime;
}

public Duration getBufferDefaultTimeout() {
return bufferDefaultTimeout;
}

public void setBufferDefaultTimeout(Duration bufferDefaultTimeout) {
this.bufferDefaultTimeout = bufferDefaultTimeout;
}

public Integer getFetchMaxBytes() {
return fetchMaxBytes;
}
Expand Down Expand Up @@ -264,14 +210,6 @@ public void setWorkers(Integer workers) {
this.workers = workers;
}

public Duration getMaxRetryDelay() {
return maxRetryDelay;
}

public void setMaxRetryDelay(Duration maxRetryDelay) {
this.maxRetryDelay = maxRetryDelay;
}

public Duration getHeartBeatInterval() {
return heartBeatInterval;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,20 @@ Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit() {
@Override
public void run() {
consumer.subscribe(Arrays.asList(topicName));
boolean retryingAfterException = false;
while (!shutdownInProgress.get()) {
try {
if (retryingAfterException) {
Thread.sleep(10000);
}
resetOffsets();
commitOffsets();
consumeRecords();
topicMetrics.update(consumer);
retryingAfterException = false;
} catch (Exception exp) {
LOG.error("Error while reading the records from the topic {}", topicName, exp);
LOG.error("Error while reading the records from the topic {}. Retry after 10 seconds", topicName, exp);
retryingAfterException = true;
}
}
}
Expand Down Expand Up @@ -292,7 +298,7 @@ private <T> Record<Event> getRecord(ConsumerRecord<String, T> consumerRecord, in
eventMetadata.setAttribute("kafka_key", key);
}
eventMetadata.setAttribute("kafka_topic", topicName);
eventMetadata.setAttribute("kafka_partition", partition);
eventMetadata.setAttribute("kafka_partition", String.valueOf(partition));

return new Record<Event>(event);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

package org.opensearch.dataprepper.plugins.kafka.util;

import com.fasterxml.jackson.annotation.JsonCreator;

import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
Expand All @@ -19,19 +20,18 @@ public enum MessageFormat {
PLAINTEXT("plaintext"), JSON("json"), AVRO("avro");

private static final Map<String, MessageFormat> MESSAGE_FORMAT_MAP = Arrays.stream(MessageFormat.values())
.collect(Collectors.toMap(MessageFormat::toString, Function.identity()));

private final String messageFormatName;
.collect(Collectors.toMap(
value -> value.type,
value -> value
));

MessageFormat(final String name) {
this.messageFormatName = name;
}
private final String type;

@Override
public String toString() {
return this.messageFormatName;
MessageFormat(final String type) {
this.type = type;
}

@JsonCreator
public static MessageFormat getByMessageFormatByName(final String name) {
return MESSAGE_FORMAT_MAP.get(name.toLowerCase());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ void testConfigValues_default() {
assertEquals(TopicConfig.DEFAULT_SESSION_TIMEOUT, topicConfig.getSessionTimeOut());
assertEquals(TopicConfig.DEFAULT_AUTO_OFFSET_RESET, topicConfig.getAutoOffsetReset());
assertEquals(TopicConfig.DEFAULT_THREAD_WAITING_TIME, topicConfig.getThreadWaitingTime());
assertEquals(TopicConfig.DEFAULT_MAX_RECORD_FETCH_TIME, topicConfig.getMaxRecordFetchTime());
assertEquals(TopicConfig.DEFAULT_BUFFER_TIMEOUT, topicConfig.getBufferDefaultTimeout());
assertEquals(TopicConfig.DEFAULT_FETCH_MAX_BYTES, topicConfig.getFetchMaxBytes());
assertEquals(TopicConfig.DEFAULT_FETCH_MAX_WAIT, topicConfig.getFetchMaxWait());
assertEquals(TopicConfig.DEFAULT_FETCH_MIN_BYTES, topicConfig.getFetchMinBytes());
Expand All @@ -96,13 +94,11 @@ void testConfigValues_from_yaml() {
assertEquals(45000, topicConfig.getSessionTimeOut().toMillis());
assertEquals("earliest", topicConfig.getAutoOffsetReset());
assertEquals(Duration.ofSeconds(1), topicConfig.getThreadWaitingTime());
assertEquals(Duration.ofSeconds(4), topicConfig.getMaxRecordFetchTime());
assertEquals(Duration.ofSeconds(5), topicConfig.getBufferDefaultTimeout());
assertEquals(52428800, topicConfig.getFetchMaxBytes().longValue());
assertEquals(500L, topicConfig.getFetchMaxWait().longValue());
assertEquals(1L, topicConfig.getFetchMinBytes().longValue());
assertEquals(Duration.ofSeconds(100), topicConfig.getRetryBackoff());
assertEquals(Duration.ofSeconds(300000), topicConfig.getMaxPollInterval());
assertEquals(Duration.ofSeconds(300), topicConfig.getMaxPollInterval());
assertEquals(500L, topicConfig.getConsumerMaxPollRecords().longValue());
assertEquals(5, topicConfig.getWorkers().intValue());
assertEquals(Duration.ofSeconds(3), topicConfig.getHeartBeatInterval());
Expand All @@ -118,8 +114,6 @@ void testConfigValues_from_yaml_not_null() {
assertNotNull(topicConfig.getSessionTimeOut());
assertNotNull(topicConfig.getAutoOffsetReset());
assertNotNull(topicConfig.getThreadWaitingTime());
assertNotNull(topicConfig.getMaxRecordFetchTime());
assertNotNull(topicConfig.getBufferDefaultTimeout());
assertNotNull(topicConfig.getFetchMaxBytes());
assertNotNull(topicConfig.getFetchMaxWait());
assertNotNull(topicConfig.getFetchMinBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ log-pipeline:
- 127.0.0.1:9093
topics:
- name: my-topic-2
group_name: kafka-consumer-group-2
group_id: my-test-group
- name: my-topic-1
group_id: my-test-group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,9 @@ log-pipeline:
auto_commit: false
commit_interval: PT5S
session_timeout: PT45S
max_retry_attempts: 1000
auto_offset_reset: earliest
thread_waiting_time: PT1S
max_record_fetch_time: PT4S
heart_beat_interval: PT3S
buffer_default_timeout: PT5S
fetch_max_bytes: 52428800
fetch_max_wait: 500
fetch_min_bytes: 1
Expand Down
Loading