-
Notifications
You must be signed in to change notification settings - Fork 195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix consumer synchronization. Fix consumer to use user-specified groupId #3100
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ | |
import org.apache.kafka.clients.consumer.OffsetAndMetadata; | ||
import org.apache.kafka.clients.consumer.CommitFailedException; | ||
import org.apache.kafka.common.errors.AuthenticationException; | ||
import org.apache.kafka.common.errors.RecordDeserializationException; | ||
import org.apache.kafka.common.TopicPartition; | ||
import org.apache.avro.generic.GenericRecord; | ||
import org.opensearch.dataprepper.model.log.JacksonLog; | ||
|
@@ -40,6 +41,8 @@ | |
import java.util.Collection; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.Set; | ||
import java.util.HashSet; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; | ||
import com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema; | ||
|
@@ -68,6 +71,7 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis | |
private static final ObjectMapper objectMapper = new ObjectMapper(); | ||
private final JsonFactory jsonFactory = new JsonFactory(); | ||
private Map<TopicPartition, OffsetAndMetadata> offsetsToCommit; | ||
private Set<TopicPartition> partitionsToReset; | ||
private final AcknowledgementSetManager acknowledgementSetManager; | ||
private final Map<Integer, TopicPartitionCommitTracker> partitionCommitTrackerMap; | ||
private final Counter positiveAcknowledgementSetCounter; | ||
|
@@ -95,6 +99,7 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer, | |
this.acknowledgementSetManager = acknowledgementSetManager; | ||
this.pluginMetrics = pluginMetrics; | ||
this.partitionCommitTrackerMap = new HashMap<>(); | ||
this.partitionsToReset = new HashSet<>(); | ||
this.schema = MessageFormat.getByMessageFormatByName(schemaType); | ||
Duration bufferTimeout = Duration.ofSeconds(1); | ||
this.bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE, bufferTimeout); | ||
|
@@ -121,29 +126,21 @@ private AcknowledgementSet createAcknowledgementSet(Map<TopicPartition, Range<Lo | |
try { | ||
int partitionId = partition.partition(); | ||
if (!partitionCommitTrackerMap.containsKey(partitionId)) { | ||
OffsetAndMetadata committedOffsetAndMetadata = null; | ||
synchronized(consumer) { | ||
committedOffsetAndMetadata = consumer.committed(partition); | ||
} | ||
OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(partition); | ||
Long committedOffset = Objects.nonNull(committedOffsetAndMetadata) ? committedOffsetAndMetadata.offset() : null; | ||
partitionCommitTrackerMap.put(partitionId, new TopicPartitionCommitTracker(partition, committedOffset)); | ||
} | ||
OffsetAndMetadata offsetAndMetadata = partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange); | ||
updateOffsetsToCommit(partition, offsetAndMetadata); | ||
} catch (Exception e) { | ||
LOG.error("Failed to seek to last committed offset upon positive acknowledgement "+partition, e); | ||
LOG.error("Failed to seek to last committed offset upon positive acknowledgement {}", partition, e); | ||
} | ||
}); | ||
} else { | ||
negativeAcknowledgementSetCounter.increment(); | ||
offsets.forEach((partition, offsetRange) -> { | ||
try { | ||
synchronized(consumer) { | ||
OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(partition); | ||
consumer.seek(partition, committedOffsetAndMetadata); | ||
} | ||
} catch (Exception e) { | ||
LOG.error("Failed to seek to last committed offset upon negative acknowledgement "+partition, e); | ||
synchronized(partitionsToReset) { | ||
partitionsToReset.add(partition); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems a nice change. Did you do any performance testing or find any particular pitfalls with the original approach? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I haven't done any performance testing. But definitely having a lock every time consumer is accessed is not good. |
||
} | ||
}); | ||
} | ||
|
@@ -157,10 +154,7 @@ private AcknowledgementSet createAcknowledgementSet(Map<TopicPartition, Range<Lo | |
|
||
public <T> void consumeRecords() throws Exception { | ||
try { | ||
ConsumerRecords<String, T> records = null; | ||
synchronized(consumer) { | ||
records = consumer.poll(topicConfig.getThreadWaitingTime().toMillis()/2); | ||
} | ||
ConsumerRecords<String, T> records = consumer.poll(topicConfig.getThreadWaitingTime().toMillis()/2); | ||
if (Objects.nonNull(records) && !records.isEmpty() && records.count() > 0) { | ||
Map<TopicPartition, Range<Long>> offsets = new HashMap<>(); | ||
AcknowledgementSet acknowledgementSet = null; | ||
|
@@ -176,12 +170,27 @@ public <T> void consumeRecords() throws Exception { | |
} | ||
} | ||
} catch (AuthenticationException e) { | ||
LOG.warn("Authentication Error while doing poll(). Will retry after 10 seconds", e); | ||
LOG.warn("Access Denied while doing poll(). Will retry after 10 seconds", e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Apache documentation indicates that this is an authentication error and not necessarily related to access controls. |
||
Thread.sleep(10000); | ||
} catch (RecordDeserializationException e) { | ||
LOG.warn("Serialization error - topic {} partition {} offset {}, seeking past the error record", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Serialization -> Deserialization. Also increment a metric when we get to metrics. |
||
e.topicPartition().topic(), e.topicPartition().partition(), e.offset()); | ||
consumer.seek(e.topicPartition(), e.offset()+1); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it really required to explicitly seek past this record? can we not just log exception, count a metric and commit offset as usual? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As per the documentation, we have to seek past the offset to continue reading. We cannot commit the offset unless we get acknowledgement that previously read records are flushed to the sink, right? |
||
} | ||
} | ||
|
||
private void commitOffsets() { | ||
private void resetOrCommitOffsets() { | ||
synchronized(partitionsToReset) { | ||
partitionsToReset.forEach(partition -> { | ||
try { | ||
final OffsetAndMetadata offsetAndMetadata = consumer.committed(partition); | ||
consumer.seek(partition, offsetAndMetadata); | ||
} catch (Exception e) { | ||
LOG.error("Failed to seek to last committed offset upon negative acknowledgement {}", partition, e); | ||
} | ||
}); | ||
partitionsToReset.clear(); | ||
} | ||
if (topicConfig.getAutoCommit()) { | ||
return; | ||
} | ||
|
@@ -194,13 +203,11 @@ private void commitOffsets() { | |
return; | ||
} | ||
try { | ||
synchronized(consumer) { | ||
consumer.commitSync(); | ||
} | ||
consumer.commitSync(); | ||
offsetsToCommit.clear(); | ||
lastCommitTime = currentTimeMillis; | ||
} catch (CommitFailedException e) { | ||
LOG.error("Failed to commit offsets in topic "+topicName, e); | ||
LOG.error("Failed to commit offsets in topic {}", topicName, e); | ||
} | ||
} | ||
} | ||
|
@@ -211,14 +218,14 @@ Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit() { | |
|
||
@Override | ||
public void run() { | ||
try { | ||
consumer.subscribe(Arrays.asList(topicName)); | ||
while (!shutdownInProgress.get()) { | ||
consumer.subscribe(Arrays.asList(topicName)); | ||
while (!shutdownInProgress.get()) { | ||
try { | ||
resetOrCommitOffsets(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: might be better to have separate functions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought about it. Resetting is just 3 or 4 line code and didn't feel like making it a separate function. Especially it may be null operation most of the time. |
||
consumeRecords(); | ||
commitOffsets(); | ||
} catch (Exception exp) { | ||
LOG.error("Error while reading the records from the topic...", exp); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should avoid ellipsis in our logs. It's not a big deal, but it seems either indicate that we trailed of or have more to say (not the exception). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will remove ellipsis in the next PR. |
||
} | ||
} catch (Exception exp) { | ||
LOG.error("Error while reading the records from the topic...", exp); | ||
} | ||
} | ||
|
||
|
@@ -306,9 +313,8 @@ public void shutdownConsumer(){ | |
@Override | ||
public void onPartitionsAssigned(Collection<TopicPartition> partitions) { | ||
for (TopicPartition topicPartition : partitions) { | ||
synchronized(consumer) { | ||
Long committedOffset = consumer.committed(topicPartition).offset(); | ||
consumer.seek(topicPartition, committedOffset); | ||
synchronized(partitionsToReset) { | ||
partitionsToReset.add(topicPartition); | ||
} | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,10 +16,7 @@ | |
import software.amazon.awssdk.services.kafka.KafkaClient; | ||
import software.amazon.awssdk.services.kafka.model.GetBootstrapBrokersRequest; | ||
import software.amazon.awssdk.services.kafka.model.GetBootstrapBrokersResponse; | ||
import software.amazon.awssdk.services.kafka.model.InternalServerErrorException; | ||
import software.amazon.awssdk.services.kafka.model.ConflictException; | ||
import software.amazon.awssdk.services.kafka.model.ForbiddenException; | ||
import software.amazon.awssdk.services.kafka.model.UnauthorizedException; | ||
import software.amazon.awssdk.services.kafka.model.KafkaException; | ||
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; | ||
import software.amazon.awssdk.services.sts.model.StsException; | ||
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; | ||
|
@@ -214,17 +211,15 @@ public static String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuth | |
retryable = false; | ||
try { | ||
result = kafkaClient.getBootstrapBrokers(request); | ||
} catch (InternalServerErrorException | ConflictException | ForbiddenException | UnauthorizedException | StsException e) { | ||
} catch (KafkaException | StsException e) { | ||
LOG.debug("Failed to get bootstrap server information from MSK. Retrying...", e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this be LOG.info? also make it explicit like will retry with exponential backoff or after so many seconds. Do we need to log entire backtrace? just e.message() may be enough? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am sure every time we included just the message, entire stack trace was later needed. Hope fully, these are not common scenarios. |
||
|
||
retryable = true; | ||
try { | ||
Thread.sleep(10000); | ||
} catch (InterruptedException exp) {} | ||
} catch (Exception e) { | ||
throw new RuntimeException("Failed to get bootstrap server information from MSK.", e); | ||
} | ||
} while (retryable && numRetries++ < MAX_KAFKA_CLIENT_RETRIES); | ||
} while (numRetries++ < MAX_KAFKA_CLIENT_RETRIES); | ||
if (Objects.isNull(result)) { | ||
throw new RuntimeException("Failed to get bootstrap server information from MSK after trying multiple times with retryable exceptions."); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could use a concurrent set to avoid synchronization on this object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better performance wise?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I doubt it, but it would ensure that somebody doesn't forget to synchronize calls as the file is maintained.