Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor fixes to Kafka Source #3174

Merged
merged 2 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void TestJsonRecordsWithNullKey() throws Exception {
assertThat(map.get("status"), equalTo(true));
assertThat(map.get("kafka_key"), equalTo(null));
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(topicName));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo(0));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
}
try (AdminClient adminClient = AdminClient.create(props)) {
try {
Expand Down Expand Up @@ -235,7 +235,7 @@ public void TestJsonRecordsWithKafkaKeyModeDiscard() throws Exception {
assertThat(map.get("id"), equalTo(TEST_ID+i));
assertThat(map.get("status"), equalTo(true));
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(topicName));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo(0));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
}
try (AdminClient adminClient = AdminClient.create(props)) {
try {
Expand Down Expand Up @@ -295,7 +295,7 @@ public void TestJsonRecordsWithKafkaKeyModeAsField() throws Exception {
assertThat(map.get("status"), equalTo(true));
assertThat(map.get("kafka_key"), equalTo(testKey));
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(topicName));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo(0));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
}
try (AdminClient adminClient = AdminClient.create(props)) {
try {
Expand Down Expand Up @@ -356,7 +356,7 @@ public void TestJsonRecordsWithKafkaKeyModeAsMetadata() throws Exception {
assertThat(map.get("status"), equalTo(true));
assertThat(metadata.getAttributes().get("kafka_key"), equalTo(testKey));
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(topicName));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo(0));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
}
try (AdminClient adminClient = AdminClient.create(props)) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class TopicConfig {
static final Duration DEFAULT_RETRY_BACKOFF = Duration.ofSeconds(10);
static final Duration DEFAULT_RECONNECT_BACKOFF = Duration.ofSeconds(10);
static final Integer DEFAULT_MAX_PARTITION_FETCH_BYTES = 1048576;
static final Duration DEFAULT_MAX_POLL_INTERVAL = Duration.ofSeconds(300000);
static final Duration DEFAULT_MAX_POLL_INTERVAL = Duration.ofSeconds(300);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set a maximum value on maxRetryDelay? A user could set it back to 300000

static final Integer DEFAULT_CONSUMER_MAX_POLL_RECORDS = 500;
static final Integer DEFAULT_NUM_OF_WORKERS = 2;
static final Duration DEFAULT_HEART_BEAT_INTERVAL_DURATION = Duration.ofSeconds(5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,20 @@ Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit() {
@Override
public void run() {
consumer.subscribe(Arrays.asList(topicName));
boolean retryingAfterException = false;
while (!shutdownInProgress.get()) {
try {
if (retryingAfterException) {
Thread.sleep(10000);
}
resetOffsets();
commitOffsets();
consumeRecords();
topicMetrics.update(consumer);
retryingAfterException = false;
} catch (Exception exp) {
LOG.error("Error while reading the records from the topic {}", topicName, exp);
LOG.error("Error while reading the records from the topic {}. Retry after 10 seconds", topicName, exp);
retryingAfterException = true;
}
}
}
Expand Down Expand Up @@ -292,7 +298,7 @@ private <T> Record<Event> getRecord(ConsumerRecord<String, T> consumerRecord, in
eventMetadata.setAttribute("kafka_key", key);
}
eventMetadata.setAttribute("kafka_topic", topicName);
eventMetadata.setAttribute("kafka_partition", partition);
eventMetadata.setAttribute("kafka_partition", String.valueOf(partition));

return new Record<Event>(event);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

package org.opensearch.dataprepper.plugins.kafka.util;

import com.fasterxml.jackson.annotation.JsonCreator;

import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
Expand All @@ -19,19 +20,18 @@ public enum MessageFormat {
PLAINTEXT("plaintext"), JSON("json"), AVRO("avro");

private static final Map<String, MessageFormat> MESSAGE_FORMAT_MAP = Arrays.stream(MessageFormat.values())
.collect(Collectors.toMap(MessageFormat::toString, Function.identity()));

private final String messageFormatName;
.collect(Collectors.toMap(
value -> value.type,
value -> value
));

MessageFormat(final String name) {
this.messageFormatName = name;
}
private final String type;

@Override
public String toString() {
return this.messageFormatName;
MessageFormat(final String type) {
this.type = type;
}

@JsonCreator
public static MessageFormat getByMessageFormatByName(final String name) {
return MESSAGE_FORMAT_MAP.get(name.toLowerCase());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void testConfigValues_from_yaml() {
assertEquals(500L, topicConfig.getFetchMaxWait().longValue());
assertEquals(1L, topicConfig.getFetchMinBytes().longValue());
assertEquals(Duration.ofSeconds(100), topicConfig.getRetryBackoff());
assertEquals(Duration.ofSeconds(300000), topicConfig.getMaxPollInterval());
assertEquals(Duration.ofSeconds(300), topicConfig.getMaxPollInterval());
assertEquals(500L, topicConfig.getConsumerMaxPollRecords().longValue());
assertEquals(5, topicConfig.getWorkers().intValue());
assertEquals(Duration.ofSeconds(3), topicConfig.getHeartBeatInterval());
Expand Down
Loading