diff --git a/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandlerTest.java b/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandlerTest.java
index 531d206ad..4879ee983 100644
--- a/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandlerTest.java
+++ b/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandlerTest.java
@@ -57,7 +57,7 @@ public CodedOutputStreamAndByteBufferWrapper createStream() {
cos.flush();
byteBufferAtomicReference.set(osh.getByteBuffer().flip().asReadOnlyBuffer());
- log.error("byteBufferAtomicReference.get="+byteBufferAtomicReference.get());
+ log.trace("byteBufferAtomicReference.get="+byteBufferAtomicReference.get());
return CompletableFuture.completedFuture(flushCount.incrementAndGet());
}
diff --git a/TrafficCapture/replayerPlugins/jsonMessageTransformers/jsonJoltMessageTransformerProvider/src/test/java/org/opensearch/migrations/replay/JsonTransformerTest.java b/TrafficCapture/replayerPlugins/jsonMessageTransformers/jsonJoltMessageTransformerProvider/src/test/java/org/opensearch/migrations/replay/JsonTransformerTest.java
index a8e911e20..9e36ea7a3 100644
--- a/TrafficCapture/replayerPlugins/jsonMessageTransformers/jsonJoltMessageTransformerProvider/src/test/java/org/opensearch/migrations/replay/JsonTransformerTest.java
+++ b/TrafficCapture/replayerPlugins/jsonMessageTransformers/jsonJoltMessageTransformerProvider/src/test/java/org/opensearch/migrations/replay/JsonTransformerTest.java
@@ -66,7 +66,7 @@ public void testHttpTransform() throws IOException {
.build();
var transformedDocument = transformer.transformJson(documentJson);
String transformedJsonOutputStr = emitJson(transformedDocument);
- log.error("transformed json document: "+transformedJsonOutputStr);
+ log.info("transformed json document: "+transformedJsonOutputStr);
Assertions.assertTrue(transformedJsonOutputStr.contains(DUMMY_HOSTNAME_TEST_STRING));
}
}
\ No newline at end of file
diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/resources/log4j2.properties b/TrafficCapture/trafficCaptureProxyServer/src/test/resources/log4j2.properties
index 9c227b84f..702836711 100644
--- a/TrafficCapture/trafficCaptureProxyServer/src/test/resources/log4j2.properties
+++ b/TrafficCapture/trafficCaptureProxyServer/src/test/resources/log4j2.properties
@@ -6,6 +6,6 @@ appender.console.target = SYSTEM_ERR
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss,SSS}{UTC} [%t] %c{1} - %msg%equals{ ctx=%mdc}{ ctx=\{\}}{}%n
-rootLogger.level = trace
+rootLogger.level = debug
rootLogger.appenderRefs = stderr
rootLogger.appenderRef.stderr.ref = STDERR
diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficCaptureSourceFactory.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficCaptureSourceFactory.java
index 68ea3b8a1..93d1bb664 100644
--- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficCaptureSourceFactory.java
+++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficCaptureSourceFactory.java
@@ -2,13 +2,14 @@
import lombok.extern.slf4j.Slf4j;
import org.opensearch.migrations.replay.kafka.KafkaBehavioralPolicy;
-import org.opensearch.migrations.replay.kafka.KafkaProtobufConsumer;
+import org.opensearch.migrations.replay.kafka.KafkaTrafficCaptureSource;
import org.opensearch.migrations.replay.traffic.source.BlockingTrafficSource;
import org.opensearch.migrations.replay.traffic.source.ISimpleTrafficCaptureSource;
import org.opensearch.migrations.replay.traffic.source.InputStreamOfTraffic;
import java.io.FileInputStream;
import java.io.IOException;
+import java.time.Clock;
import java.time.Duration;
@Slf4j
@@ -31,8 +32,9 @@ private TrafficCaptureSourceFactory() {}
}
if (isKafkaActive) {
- return KafkaProtobufConsumer.buildKafkaConsumer(appParams.kafkaTrafficBrokers, appParams.kafkaTrafficTopic,
- appParams.kafkaTrafficGroupId, appParams.kafkaTrafficEnableMSKAuth, appParams.kafkaTrafficPropertyFile, new KafkaBehavioralPolicy());
+ return KafkaTrafficCaptureSource.buildKafkaConsumer(appParams.kafkaTrafficBrokers, appParams.kafkaTrafficTopic,
+ appParams.kafkaTrafficGroupId, appParams.kafkaTrafficEnableMSKAuth, appParams.kafkaTrafficPropertyFile,
+ Clock.systemUTC(), new KafkaBehavioralPolicy());
} else if (isInputFileActive) {
return new InputStreamOfTraffic(new FileInputStream(appParams.inputFilename));
} else {
diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/PojoTrafficStreamKey.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/PojoTrafficStreamKey.java
index ce32ab971..da4ce3ee1 100644
--- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/PojoTrafficStreamKey.java
+++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/PojoTrafficStreamKey.java
@@ -1,10 +1,12 @@
package org.opensearch.migrations.replay.datatypes;
+import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;
import org.opensearch.migrations.trafficcapture.protos.TrafficStreamUtils;
@ToString
+@EqualsAndHashCode()
public class PojoTrafficStreamKey implements ITrafficStreamKey {
private final String nodeId;
private final String connectionId;
diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaCommitOffsetData.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaCommitOffsetData.java
new file mode 100644
index 000000000..d110e24c5
--- /dev/null
+++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaCommitOffsetData.java
@@ -0,0 +1,8 @@
+package org.opensearch.migrations.replay.kafka;
+
+public interface KafkaCommitOffsetData {
+ int getPartition();
+ long getOffset();
+ int getGeneration();
+
+}
diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaProtobufConsumer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaProtobufConsumer.java
deleted file mode 100644
index 2282121fd..000000000
--- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaProtobufConsumer.java
+++ /dev/null
@@ -1,288 +0,0 @@
-package org.opensearch.migrations.replay.kafka;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-import lombok.NonNull;
-import lombok.ToString;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.opensearch.migrations.coreutils.MetricsAttributeKey;
-import org.opensearch.migrations.coreutils.MetricsEvent;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.opensearch.migrations.coreutils.MetricsLogger;
-import org.opensearch.migrations.replay.datatypes.ITrafficStreamKey;
-import org.opensearch.migrations.replay.datatypes.PojoTrafficStreamKey;
-import org.opensearch.migrations.replay.datatypes.PojoTrafficStreamWithKey;
-import org.opensearch.migrations.replay.traffic.source.ISimpleTrafficCaptureSource;
-import org.opensearch.migrations.replay.traffic.source.ITrafficStreamWithKey;
-import org.opensearch.migrations.trafficcapture.protos.TrafficStream;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.time.Duration;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.PriorityQueue;
-import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
-
-/**
- * Adapt a Kafka stream into a TrafficCaptureSource.
- *
- * Notice that there's a critical gap between how Kafka accepts commits and how the
- * BlockingTrafficSource throttles calls to Kafka. The BlockingTrafficSource may
- * block calls to readNextTrafficStreamChunk() until some time window elapses. This
- * could be a very large window in cases where there were long gaps between recorded
- * requests from the capturing proxy. For example, if a TrafficStream is read and it
- * that stream is scheduled to be run one hour later, readNextTrafficStreamChunk()
- * may not be called for almost an hour. By design, we're not calling Kafka to pull
- * any more messages since we know that we don't have work to do for an hour. Shortly
- * after the hour of waiting begins, Kakfa will notice that this application is no
- * longer calling poll and will kick the consumer out of the client group. Other
- * consumers may connect, though they'll also be kicked out of the group shortly.
- *
- * See
- * ...
- *
- * "Basically if you don't call poll at least as frequently as the configured max interval,
- * then the client will proactively leave the group so that another consumer can take
- * over its partitions. When this happens, you may see an offset commit failure (as
- * indicated by a CommitFailedException thrown from a call to commitSync())."
- *
- * I believe that this can be mitigated, hopefully fully, by adding a keepAlive/do nothing
- * call that the BlockingTrafficSource can use. That can be implemented in a source
- * like this with Kafka by polling, then resetting the position on the stream if we
- * aren't supposed to be reading new data.
- */
-@Slf4j
-public class KafkaProtobufConsumer implements ISimpleTrafficCaptureSource {
-
- @ToString(callSuper = true)
- private static class TrafficStreamKeyWithKafkaRecordId extends PojoTrafficStreamKey {
- private final int partition;
- private final long offset;
-
- TrafficStreamKeyWithKafkaRecordId(TrafficStream trafficStream, int partition, long offset) {
- super(trafficStream);
- this.partition = partition;
- this.offset = offset;
- }
- }
-
- private static class OffsetLifecycleTracker {
- private final PriorityQueue pQueue = new PriorityQueue<>();
- private long cursorHighWatermark;
-
- private OffsetLifecycleTracker() {
- }
-
- boolean isEmpty() {
- return pQueue.isEmpty();
- }
-
- void add(long offset) {
- cursorHighWatermark = offset;
- pQueue.add(offset);
- }
-
- Optional removeAndReturnNewHead(TrafficStreamKeyWithKafkaRecordId kafkaRecord) {
- var offsetToRemove = kafkaRecord.offset;
- var topCursor = pQueue.peek();
- var didRemove = pQueue.remove(offsetToRemove);
- assert didRemove : "Expected all live records to have an entry and for them to be removed only once";
- if (topCursor == offsetToRemove) {
- topCursor = Optional.ofNullable(pQueue.peek())
- .orElse(cursorHighWatermark+1); // most recent cursor was previously popped
- log.atDebug().setMessage("Commit called for {}, and new topCursor={}")
- .addArgument(offsetToRemove).addArgument(topCursor).log();
- return Optional.of(topCursor);
- } else {
- log.atDebug().setMessage("Commit called for {}, but topCursor={}")
- .addArgument(offsetToRemove).addArgument(topCursor).log();
- return Optional.empty();
- }
- }
- }
-
- private static final MetricsLogger metricsLogger = new MetricsLogger("KafkaProtobufConsumer");
-
- public static final Duration CONSUMER_POLL_TIMEOUT = Duration.ofSeconds(1);
-
- private final Consumer kafkaConsumer;
- private final ConcurrentHashMap partitionToOffsetLifecycleTrackerMap;
- private final ConcurrentHashMap nextSetOfCommitsMap;
- private final Object offsetLifecycleLock = new Object();
- private final String topic;
- private final KafkaBehavioralPolicy behavioralPolicy;
- private final AtomicInteger trafficStreamsRead;
-
- public KafkaProtobufConsumer(Consumer kafkaConsumer, String topic) {
- this(kafkaConsumer, topic, new KafkaBehavioralPolicy());
- }
-
- public KafkaProtobufConsumer(Consumer kafkaConsumer, @NonNull String topic,
- KafkaBehavioralPolicy behavioralPolicy) {
- this.kafkaConsumer = kafkaConsumer;
- this.topic = topic;
- this.behavioralPolicy = behavioralPolicy;
- kafkaConsumer.subscribe(Collections.singleton(topic));
- trafficStreamsRead = new AtomicInteger();
-
- partitionToOffsetLifecycleTrackerMap = new ConcurrentHashMap<>();
- nextSetOfCommitsMap = new ConcurrentHashMap<>();
- }
-
- public static KafkaProtobufConsumer buildKafkaConsumer(@NonNull String brokers,
- @NonNull String topic,
- @NonNull String groupId,
- boolean enableMSKAuth,
- String propertyFilePath,
- KafkaBehavioralPolicy behavioralPolicy) throws IOException {
- var kafkaProps = buildKafkaProperties(brokers, groupId, enableMSKAuth, propertyFilePath);
- return new KafkaProtobufConsumer(new KafkaConsumer<>(kafkaProps), topic, behavioralPolicy);
- }
-
- public static Properties buildKafkaProperties(@NonNull String brokers,
- @NonNull String groupId,
- boolean enableMSKAuth,
- String propertyFilePath) throws IOException {
- var kafkaProps = new Properties();
- kafkaProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- kafkaProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
- kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- if (propertyFilePath != null) {
- try (InputStream input = new FileInputStream(propertyFilePath)) {
- kafkaProps.load(input);
- } catch (IOException ex) {
- log.error("Unable to load properties from kafka properties file with path: {}", propertyFilePath);
- throw ex;
- }
- }
- // Required for using SASL auth with MSK public endpoint
- if (enableMSKAuth) {
- kafkaProps.setProperty("security.protocol", "SASL_SSL");
- kafkaProps.setProperty("sasl.mechanism", "AWS_MSK_IAM");
- kafkaProps.setProperty("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;");
- kafkaProps.setProperty("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler");
- }
- kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
- kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
- return kafkaProps;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public CompletableFuture> readNextTrafficStreamChunk() {
- return CompletableFuture.supplyAsync(this::readNextTrafficStreamSynchronously);
- }
-
- public List readNextTrafficStreamSynchronously() {
- try {
- ConsumerRecords records;
- records = safeCommitAndPollWithSwallowedRuntimeExceptions();
- Stream trafficStream = StreamSupport.stream(records.spliterator(), false)
- .map(kafkaRecord -> {
- try {
- TrafficStream ts = TrafficStream.parseFrom(kafkaRecord.value());
- // Ensure we increment trafficStreamsRead even at a higher log level
- metricsLogger.atSuccess(MetricsEvent.PARSED_TRAFFIC_STREAM_FROM_KAFKA)
- .setAttribute(MetricsAttributeKey.CONNECTION_ID, ts.getConnectionId())
- .setAttribute(MetricsAttributeKey.TOPIC_NAME, this.topic)
- .setAttribute(MetricsAttributeKey.SIZE_IN_BYTES, ts.getSerializedSize()).emit();
- addOffset(kafkaRecord.partition(), kafkaRecord.offset());
- var key = new TrafficStreamKeyWithKafkaRecordId(ts, kafkaRecord.partition(), kafkaRecord.offset());
- log.atTrace().setMessage(()->"Parsed traffic stream #{}: {} {}")
- .addArgument(trafficStreamsRead.incrementAndGet())
- .addArgument(key)
- .addArgument(ts)
- .log();
- return (ITrafficStreamWithKey) new PojoTrafficStreamWithKey(ts, key);
- } catch (InvalidProtocolBufferException e) {
- RuntimeException recordError = behavioralPolicy.onInvalidKafkaRecord(kafkaRecord, e);
- metricsLogger.atError(MetricsEvent.PARSING_TRAFFIC_STREAM_FROM_KAFKA_FAILED, recordError)
- .setAttribute(MetricsAttributeKey.TOPIC_NAME, this.topic).emit();
- if (recordError != null) {
- throw recordError;
- }
- return null;
- }
- }).filter(Objects::nonNull);
- return trafficStream.collect(Collectors.toList());
- } catch (Exception e) {
- log.error("Terminating Kafka traffic stream");
- throw e;
- }
- }
-
- private ConsumerRecords safeCommitAndPollWithSwallowedRuntimeExceptions() {
- try {
- synchronized (offsetLifecycleLock) {
- if (!nextSetOfCommitsMap.isEmpty()) {
- log.atDebug().setMessage(()->"Committing "+nextSetOfCommitsMap).log();
- kafkaConsumer.commitSync(nextSetOfCommitsMap);
- log.atDebug().setMessage(()->"Done committing "+nextSetOfCommitsMap).log();
- nextSetOfCommitsMap.clear();
- }
- }
-
- var records = kafkaConsumer.poll(CONSUMER_POLL_TIMEOUT);
- log.atInfo().setMessage(()->"Kafka consumer poll has fetched "+records.count()+" records").log();
- log.atDebug().setMessage(()->"All positions: {"+kafkaConsumer.assignment().stream()
- .map(tp->tp+": "+kafkaConsumer.position(tp)).collect(Collectors.joining(",")) + "}").log();
- log.atDebug().setMessage(()->"All COMMITTED positions: {"+kafkaConsumer.assignment().stream()
- .map(tp->tp+": "+kafkaConsumer.committed(tp)).collect(Collectors.joining(",")) + "}").log();
- return records;
- } catch (RuntimeException e) {
- log.atWarn().setCause(e).setMessage("Unable to poll the topic: {} with our Kafka consumer. " +
- "Swallowing and awaiting next metadata refresh to try again.").addArgument(topic).log();
- return new ConsumerRecords<>(Collections.emptyMap());
- }
- }
-
- private void addOffset(int partition, long offset) {
- synchronized (offsetLifecycleLock) {
- var offsetTracker = partitionToOffsetLifecycleTrackerMap.computeIfAbsent(partition, p ->
- new OffsetLifecycleTracker());
- offsetTracker.add(offset);
- }
- }
-
- @Override
- public void commitTrafficStream(ITrafficStreamKey trafficStreamKey) {
- if (!(trafficStreamKey instanceof TrafficStreamKeyWithKafkaRecordId)) {
- throw new IllegalArgumentException("Expected key of type "+TrafficStreamKeyWithKafkaRecordId.class+
- " but received "+trafficStreamKey+" (of type="+trafficStreamKey.getClass()+")");
- }
- var kafkaTsk = (TrafficStreamKeyWithKafkaRecordId) trafficStreamKey;
- var p = kafkaTsk.partition;
- Optional newHeadValue;
- synchronized (offsetLifecycleLock) {
- var tracker = partitionToOffsetLifecycleTrackerMap.get(p);
- newHeadValue = tracker.removeAndReturnNewHead(kafkaTsk);
- newHeadValue.ifPresent(o -> {
- if (tracker.isEmpty()) {
- partitionToOffsetLifecycleTrackerMap.remove(p);
- }
- nextSetOfCommitsMap.put(new TopicPartition(topic, p), new OffsetAndMetadata(o));
- });
- }
- }
-
- @Override
- public void close() throws IOException {
- kafkaConsumer.close();
- log.info("Kafka consumer closed successfully.");
- }
-}
diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSource.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSource.java
new file mode 100644
index 000000000..4d6de51df
--- /dev/null
+++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSource.java
@@ -0,0 +1,221 @@
+package org.opensearch.migrations.replay.kafka;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.opensearch.migrations.coreutils.MetricsAttributeKey;
+import org.opensearch.migrations.coreutils.MetricsEvent;
+import org.opensearch.migrations.coreutils.MetricsLogger;
+import org.opensearch.migrations.replay.datatypes.ITrafficStreamKey;
+import org.opensearch.migrations.replay.datatypes.PojoTrafficStreamWithKey;
+import org.opensearch.migrations.replay.traffic.source.ISimpleTrafficCaptureSource;
+import org.opensearch.migrations.replay.traffic.source.ITrafficStreamWithKey;
+import org.opensearch.migrations.trafficcapture.protos.TrafficStream;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+/**
+ * Adapt a Kafka stream into a TrafficCaptureSource.
+ *
+ * Notice that there's a critical gap between how Kafka accepts commits and how the
+ * BlockingTrafficSource throttles calls to Kafka. The BlockingTrafficSource may
+ * block calls to readNextTrafficStreamChunk() until some time window elapses. This
+ * could be a very large window in cases where there were long gaps between recorded
+ * requests from the capturing proxy. For example, if a TrafficStream is read and it
+ * that stream is scheduled to be run one hour later, readNextTrafficStreamChunk()
+ * may not be called for almost an hour. By design, we're not calling Kafka to pull
+ * any more messages since we know that we don't have work to do for an hour. Shortly
+ * after the hour of waiting begins, Kakfa will notice that this application is no
+ * longer calling poll and will kick the consumer out of the client group.
+ *
+ * See
+ * ...
+ *
+ * "Basically if you don't call poll at least as frequently as the configured max interval,
+ * then the client will proactively leave the group so that another consumer can take
+ * over its partitions. When this happens, you may see an offset commit failure (as
+ * indicated by a CommitFailedException thrown from a call to commitSync())."
+ *
+ * Since the Kafka client requires all calls to be made from the same thread, we can't
+ * simply run a background job to keep the client warm. We need the caller to touch
+ * this object periodically to keep the connection alive.
+ */
+@Slf4j
+public class KafkaTrafficCaptureSource implements ISimpleTrafficCaptureSource {
+
+ public static final String MAX_POLL_INTERVAL_KEY = "max.poll.interval.ms";
+ // see https://stackoverflow.com/questions/39730126/difference-between-session-timeout-ms-and-max-poll-interval-ms-for-kafka-0-10
+ public static final String DEFAULT_POLL_INTERVAL_MS = "60000";
+ private static final MetricsLogger metricsLogger = new MetricsLogger("KafkaProtobufConsumer");
+
+
+ final TrackingKafkaConsumer trackingKafkaConsumer;
+ private final AtomicLong trafficStreamsRead;
+ private final KafkaBehavioralPolicy behavioralPolicy;
+
+ public KafkaTrafficCaptureSource(Consumer kafkaConsumer, String topic, Duration keepAliveInterval) {
+ this(kafkaConsumer, topic, keepAliveInterval, Clock.systemUTC(), new KafkaBehavioralPolicy());
+ }
+
+ public KafkaTrafficCaptureSource(Consumer kafkaConsumer,
+ @NonNull String topic,
+ Duration keepAliveInterval,
+ Clock clock,
+ @NonNull KafkaBehavioralPolicy behavioralPolicy)
+ {
+ trackingKafkaConsumer = new TrackingKafkaConsumer(kafkaConsumer, topic, keepAliveInterval, clock);
+ trafficStreamsRead = new AtomicLong();
+ this.behavioralPolicy = behavioralPolicy;
+ kafkaConsumer.subscribe(Collections.singleton(topic), trackingKafkaConsumer);
+ }
+
+ public static KafkaTrafficCaptureSource buildKafkaConsumer(@NonNull String brokers,
+ @NonNull String topic,
+ @NonNull String groupId,
+ boolean enableMSKAuth,
+ String propertyFilePath,
+ @NonNull Clock clock,
+ @NonNull KafkaBehavioralPolicy behavioralPolicy)
+ throws IOException
+ {
+ var kafkaProps = buildKafkaProperties(brokers, groupId, enableMSKAuth, propertyFilePath);
+ kafkaProps.putIfAbsent(MAX_POLL_INTERVAL_KEY, DEFAULT_POLL_INTERVAL_MS);
+ var pollPeriod = Duration.ofMillis(Long.valueOf((String)kafkaProps.get(MAX_POLL_INTERVAL_KEY)));
+ var keepAlivePeriod = getKeepAlivePeriodFromPollPeriod(pollPeriod);
+ return new KafkaTrafficCaptureSource(new KafkaConsumer<>(kafkaProps),
+ topic, keepAlivePeriod, clock, behavioralPolicy);
+ }
+
+ /**
+ * We'll have to 'maintain' touches more frequently than the poll period, otherwise the
+ * consumer will fall out of the group, putting all the commits in-flight at risk. Notice
+ * that this doesn't have a bearing on heartbeats, which themselves are maintained through
+ * Kafka Consumer poll() calls. When those poll calls stop, so does the heartbeat, which
+ * is more sensitive, but managed via the 'session.timeout.ms' property.
+ */
+ private static Duration getKeepAlivePeriodFromPollPeriod(Duration pollPeriod) {
+ return pollPeriod.dividedBy(2);
+ }
+
+ public static Properties buildKafkaProperties(@NonNull String brokers,
+ @NonNull String groupId,
+ boolean enableMSKAuth,
+ String propertyFilePath) throws IOException {
+ var kafkaProps = new Properties();
+ kafkaProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ kafkaProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ if (propertyFilePath != null) {
+ try (InputStream input = new FileInputStream(propertyFilePath)) {
+ kafkaProps.load(input);
+ } catch (IOException ex) {
+ log.error("Unable to load properties from kafka properties file with path: {}", propertyFilePath);
+ throw ex;
+ }
+ }
+ // Required for using SASL auth with MSK public endpoint
+ if (enableMSKAuth) {
+ kafkaProps.setProperty("security.protocol", "SASL_SSL");
+ kafkaProps.setProperty("sasl.mechanism", "AWS_MSK_IAM");
+ kafkaProps.setProperty("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;");
+ kafkaProps.setProperty("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler");
+ }
+ kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+ kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ return kafkaProps;
+ }
+
+ @Override
+ public void touch() {
+ trackingKafkaConsumer.touch();
+ }
+
+ /**
+ * If messages are outstanding, we need to keep the connection alive, otherwise, there's no
+ * reason to. It's OK to fall out of the group and rejoin once ready.
+ * @return
+ */
+ @Override
+ public Optional getNextRequiredTouch() {
+ return trackingKafkaConsumer.getNextRequiredTouch();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public CompletableFuture> readNextTrafficStreamChunk() {
+ log.atTrace().setMessage("readNextTrafficStreamChunk()").log();
+ return CompletableFuture.supplyAsync(() -> {
+ log.atTrace().setMessage("async...readNextTrafficStreamChunk()").log();
+ return readNextTrafficStreamSynchronously();
+ });
+ }
+
+ public List readNextTrafficStreamSynchronously() {
+ log.atTrace().setMessage("readNextTrafficStreamSynchronously()").log();
+ try {
+ var records = trackingKafkaConsumer.getNextBatchOfRecords();
+ Stream trafficStream = StreamSupport.stream(records.spliterator(), false)
+ .map(kafkaRecord -> {
+ try {
+ TrafficStream ts = TrafficStream.parseFrom(kafkaRecord.value());
+ // Ensure we increment trafficStreamsRead even at a higher log level
+ metricsLogger.atSuccess(MetricsEvent.PARSED_TRAFFIC_STREAM_FROM_KAFKA)
+ .setAttribute(MetricsAttributeKey.CONNECTION_ID, ts.getConnectionId())
+ .setAttribute(MetricsAttributeKey.TOPIC_NAME, trackingKafkaConsumer.topic)
+ .setAttribute(MetricsAttributeKey.SIZE_IN_BYTES, ts.getSerializedSize()).emit();
+ var key = trackingKafkaConsumer.createAndTrackKey(kafkaRecord.partition(), kafkaRecord.offset(),
+ ck -> new TrafficStreamKeyWithKafkaRecordId(ts, ck));
+ var trafficStreamsSoFar = trafficStreamsRead.incrementAndGet();
+ log.atTrace().setMessage(()->"Parsed traffic stream #" + trafficStreamsSoFar +
+ ": " + key + " " + ts).log();
+ return (ITrafficStreamWithKey) new PojoTrafficStreamWithKey(ts, key);
+ } catch (InvalidProtocolBufferException e) {
+ RuntimeException recordError = behavioralPolicy.onInvalidKafkaRecord(kafkaRecord, e);
+ metricsLogger.atError(MetricsEvent.PARSING_TRAFFIC_STREAM_FROM_KAFKA_FAILED, recordError)
+ .setAttribute(MetricsAttributeKey.TOPIC_NAME, trackingKafkaConsumer.topic).emit();
+ if (recordError != null) {
+ throw recordError;
+ }
+ return null;
+ }
+ }).filter(Objects::nonNull);
+ return trafficStream.collect(Collectors.toList());
+ } catch (Exception e) {
+ log.atError().setCause(e).setMessage("Terminating Kafka traffic stream due to exception").log();
+ throw e;
+ }
+ }
+
+ @Override
+ public void commitTrafficStream(ITrafficStreamKey trafficStreamKey) {
+ if (!(trafficStreamKey instanceof TrafficStreamKeyWithKafkaRecordId)) {
+ throw new IllegalArgumentException("Expected key of type "+TrafficStreamKeyWithKafkaRecordId.class+
+ " but received "+trafficStreamKey+" (of type="+trafficStreamKey.getClass()+")");
+ }
+ trackingKafkaConsumer.commitKafkaKey((TrafficStreamKeyWithKafkaRecordId) trafficStreamKey);
+ }
+
+ @Override
+ public void close() throws IOException {
+ trackingKafkaConsumer.close();
+ }
+}
diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/OffsetLifecycleTracker.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/OffsetLifecycleTracker.java
new file mode 100644
index 000000000..98fc02332
--- /dev/null
+++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/OffsetLifecycleTracker.java
@@ -0,0 +1,49 @@
+package org.opensearch.migrations.replay.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Optional;
+import java.util.PriorityQueue;
+
+/**
+ * This uses a PriorityQueue to find the MINIMUM offset that has yet to be 'committed'.
+ * This class assumes that add() will be called with ascending offsets and that
+ * removeAndReturnNewHead may be called in any order. removeAndReturnNewHead returns
+ * the new commit offset for the partition that this object is associated with.
+ * It's also assumed that callers MUST call removeAndReturnNewHead for every offset
+ * that was previously added for commit points to be advanced.
+ */
+@Slf4j
+class OffsetLifecycleTracker {
+ private final PriorityQueue pQueue = new PriorityQueue<>();
+ private long cursorHighWatermark;
+ final int consumerConnectionGeneration;
+
+ OffsetLifecycleTracker(int generation) {
+ this.consumerConnectionGeneration = generation;
+ }
+
+ boolean isEmpty() {
+ return pQueue.isEmpty();
+ }
+
+ void add(long offset) {
+ cursorHighWatermark = offset;
+ pQueue.add(offset);
+ }
+
+ Optional removeAndReturnNewHead(long offsetToRemove) {
+ var topCursor = pQueue.peek();
+ var didRemove = pQueue.remove(offsetToRemove);
+ assert didRemove : "Expected all live records to have an entry and for them to be removed only once";
+ if (topCursor == offsetToRemove) {
+ topCursor = Optional.ofNullable(pQueue.peek())
+ .orElse(cursorHighWatermark + 1); // most recent cursor was previously popped
+ log.atDebug().setMessage("Commit called for " + offsetToRemove + ", and new topCursor=" + topCursor).log();
+ return Optional.of(topCursor);
+ } else {
+ log.atDebug().setMessage("Commit called for " + offsetToRemove + ", but topCursor=" + topCursor).log();
+ return Optional.empty();
+ }
+ }
+}
diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/PojoKafkaCommitOffsetData.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/PojoKafkaCommitOffsetData.java
new file mode 100644
index 000000000..bf3d24a62
--- /dev/null
+++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/PojoKafkaCommitOffsetData.java
@@ -0,0 +1,12 @@
+package org.opensearch.migrations.replay.kafka;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+@Getter
+@AllArgsConstructor
+public class PojoKafkaCommitOffsetData implements KafkaCommitOffsetData {
+ final int generation;
+ final int partition;
+ final long offset;
+}
diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/TrackingKafkaConsumer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/TrackingKafkaConsumer.java
new file mode 100644
index 000000000..6379869d0
--- /dev/null
+++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/TrackingKafkaConsumer.java
@@ -0,0 +1,240 @@
+package org.opensearch.migrations.replay.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.event.Level;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * This is a wrapper around Kafka's Consumer class that provides tracking of partitions
+ * and their current (asynchronously 'committed' by the calling contexts) offsets. It
+ * manages those offsets and the 'active' set of records that have been rendered by this
+ * consumer, when to pause a poll loop(), and how to deal with consumer rebalances.
+ */
+@Slf4j
+public class TrackingKafkaConsumer implements ConsumerRebalanceListener {
+
+ /**
+ * The keep-alive should already be set to a fraction of the max poll timeout for
+ * the consumer (done outside of this class). The keep-alive tells this class how
+ * often the caller should be interacting with touch() and poll() calls. As such,
+ * we want to set up a long enough poll to not overwhelm a broker or client with
+ * many empty poll() message responses. We also don't want to poll() for so long
+ * when there aren't messages that there isn't enough time to commit messages,
+ * which happens after we poll() (on the same thread, as per Consumer requirements).
+ */
+ public static final int POLL_TIMEOUT_KEEP_ALIVE_DIVISOR = 4;
+ private final Consumer kafkaConsumer;
+
+ final String topic;
+ private final Clock clock;
+ /**
+ * This collection holds the definitive list, as per the rebalance callback, of the partitions
+ * that are currently assigned to this consumer. The objects are removed when partitions are
+ * revoked and new objects are only created/inserted when they're assigned. That means that
+ * the generations of each OffsetLifecycleTracker value may be different.
+ */
+ final Map partitionToOffsetLifecycleTrackerMap;
+ // loosening visibility so that a unit test can read this
+ final Map nextSetOfCommitsMap;
+ private final Duration keepAliveInterval;
+ private final AtomicReference lastTouchTimeRef;
+ private int consumerConnectionGeneration;
+ private boolean hasPendingCommitsToSend;
+
+ public TrackingKafkaConsumer(Consumer kafkaConsumer, String topic,
+ Duration keepAliveInterval, Clock c) {
+ this.kafkaConsumer = kafkaConsumer;
+ this.topic = topic;
+ this.clock = c;
+ this.partitionToOffsetLifecycleTrackerMap = new HashMap<>();
+ this.nextSetOfCommitsMap = new HashMap<>();
+ this.lastTouchTimeRef = new AtomicReference<>(Instant.EPOCH);
+ this.keepAliveInterval = keepAliveInterval;
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection partitions) {
+ safeCommit();
+ partitions.forEach(p->{
+ nextSetOfCommitsMap.remove(new TopicPartition(topic, p.partition()));
+ partitionToOffsetLifecycleTrackerMap.remove(p.partition());
+ });
+ if (hasPendingCommitsToSend) {
+ hasPendingCommitsToSend = partitionToOffsetLifecycleTrackerMap.values().stream()
+ .anyMatch(olt -> !olt.isEmpty());
+ }
+ log.atWarn().setMessage(()->this+"partitions revoked for "+partitions.stream()
+ .map(p->p+"").collect(Collectors.joining(","))).log();
+ }
+
+ @Override public void onPartitionsAssigned(Collection partitions) {
+ ++consumerConnectionGeneration;
+ partitions.forEach(p->partitionToOffsetLifecycleTrackerMap.computeIfAbsent(p.partition(),
+ x->new OffsetLifecycleTracker(consumerConnectionGeneration)));
+ log.atWarn().setMessage(()->this+"partitions added for "+partitions.stream()
+ .map(p->p+"").collect(Collectors.joining(","))).log();
+ }
+
+ public void close() {
+ log.atInfo().setMessage(()->"Kafka consumer closing. " +
+ "Committing (implicitly by Kafka's consumer): " + nextCommitsToString()).log();
+ kafkaConsumer.close();
+ }
+
+ public Optional getNextRequiredTouch() {
+ return hasPendingCommitsToSend ? Optional.of(lastTouchTimeRef.get().plus(keepAliveInterval)) : Optional.empty();
+ }
+
+ public void touch() {
+ log.trace("touch() called.");
+ pause();
+ try {
+ var records = kafkaConsumer.poll(Duration.ZERO);
+ if (!records.isEmpty()) {
+ throw new IllegalStateException("Expected no entries once the consumer was paused. " +
+ "This may have happened because a new assignment slipped into the consumer AFTER pause calls.");
+ }
+ } catch (IllegalStateException e) {
+ throw e;
+ } catch (RuntimeException e) {
+ log.atWarn().setCause(e).setMessage("Unable to poll the topic: " + topic + " with our Kafka consumer. " +
+ "Swallowing and awaiting next metadata refresh to try again.").log();
+ } finally {
+ resume();
+ }
+ safeCommit();
+ lastTouchTimeRef.set(clock.instant());
+ }
+
+ private void pause() {
+ var activePartitions = kafkaConsumer.assignment();
+ try {
+ kafkaConsumer.pause(activePartitions);
+ } catch (IllegalStateException e) {
+ log.atError().setCause(e).setMessage(()->"Unable to pause the topic partitions: " + topic + ". " +
+ "The active partitions passed here : " + activePartitions.stream()
+ .map(x->x.toString()).collect(Collectors.joining(",")) + ". " +
+ "The active partitions as tracked here are: " + getActivePartitions().stream()
+ .map(x->x.toString()).collect(Collectors.joining(",")) + ". " +
+ "The active partitions according to the consumer: " + kafkaConsumer.assignment().stream()
+ .map(x->x.toString()).collect(Collectors.joining(","))
+ ).log();
+ }
+ }
+
+ private void resume() {
+ var activePartitions = kafkaConsumer.assignment();
+ try {
+ kafkaConsumer.pause(activePartitions);
+ } catch (IllegalStateException e) {
+ log.atError().setCause(e).setMessage(()->"Unable to resume the topic partitions: " + topic + ". " +
+ "This may not be a fatal error for the entire process as the consumer should eventually"
+ + " rejoin and rebalance. " +
+ "The active partitions passed here : " + activePartitions.stream()
+ .map(x->x.toString()).collect(Collectors.joining(",")) + ". " +
+ "The active partitions as tracked here are: " + getActivePartitions().stream()
+ .map(x->x.toString()).collect(Collectors.joining(",")) + ". " +
+ "The active partitions according to the consumer: " + kafkaConsumer.assignment().stream()
+ .map(x->x.toString()).collect(Collectors.joining(","))
+ ).log();
+ }
+ }
+
+ public K createAndTrackKey(int partition, long offset, Function keyFactory) {
+ var offsetTracker = partitionToOffsetLifecycleTrackerMap.get(partition);
+ offsetTracker.add(offset);
+ return keyFactory.apply(new PojoKafkaCommitOffsetData(consumerConnectionGeneration, partition, offset));
+ }
+
+ private Collection getActivePartitions() {
+ return partitionToOffsetLifecycleTrackerMap.keySet().stream()
+ .map(p->new TopicPartition(topic,p)).collect(Collectors.toList());
+ }
+
+ public ConsumerRecords getNextBatchOfRecords() {
+ var records = safePollWithSwallowedRuntimeExceptions();
+ safeCommit();
+ return records;
+ }
+
+ private ConsumerRecords safePollWithSwallowedRuntimeExceptions() {
+ try {
+ lastTouchTimeRef.set(clock.instant());
+ var records = kafkaConsumer.poll(keepAliveInterval.dividedBy(POLL_TIMEOUT_KEEP_ALIVE_DIVISOR));
+ log.atLevel(records.isEmpty()? Level.TRACE:Level.INFO)
+ .setMessage(()->"Kafka consumer poll has fetched "+records.count()+" records").log();
+ log.atTrace().setMessage(()->"All positions: {"+kafkaConsumer.assignment().stream()
+ .map(tp->tp+": "+kafkaConsumer.position(tp)).collect(Collectors.joining(",")) + "}").log();
+ log.atTrace().setMessage(()->"All previously COMMITTED positions: {"+kafkaConsumer.assignment().stream()
+ .map(tp->tp+": "+kafkaConsumer.committed(tp)).collect(Collectors.joining(",")) + "}").log();
+ return records;
+ } catch (RuntimeException e) {
+ log.atWarn().setCause(e).setMessage("Unable to poll the topic: {} with our Kafka consumer. " +
+ "Swallowing and awaiting next metadata refresh to try again.").addArgument(topic).log();
+ return new ConsumerRecords<>(Collections.emptyMap());
+ }
+ }
+
+ void commitKafkaKey(KafkaCommitOffsetData kafkaTsk) {
+ if (kafkaTsk.getGeneration() != consumerConnectionGeneration) {
+ log.atWarn().setMessage(()->"trafficKey's generation (" + kafkaTsk.getGeneration() + ") is not current (" +
+ consumerConnectionGeneration + "). Dropping this commit request since the record would have " +
+ "been or will be handled again by a current consumer within this process or another. Full key=" +
+ kafkaTsk).log();
+ return;
+ }
+ var p = kafkaTsk.getPartition();
+ Optional newHeadValue;
+
+ newHeadValue = partitionToOffsetLifecycleTrackerMap.get(p).removeAndReturnNewHead(kafkaTsk.getOffset());
+ newHeadValue.ifPresent(o -> {
+ hasPendingCommitsToSend = true;
+ nextSetOfCommitsMap.put(new TopicPartition(topic, p), new OffsetAndMetadata(o));
+ });
+ }
+
+ private void safeCommit() {
+ try {
+ if (hasPendingCommitsToSend) {
+ log.atDebug().setMessage(() -> "Committing " + nextSetOfCommitsMap).log();
+ kafkaConsumer.commitSync(nextSetOfCommitsMap);
+ log.atDebug().setMessage(() -> "Done committing " + nextSetOfCommitsMap).log();
+ nextSetOfCommitsMap.clear();
+ }
+ } catch (RuntimeException e) {
+ log.atWarn().setCause(e)
+ .setMessage(() -> "Error while committing. " +
+ "Another consumer may already be processing messages before these commits. " +
+ "Commits ARE NOT being discarded here, with the expectation that the revoked callback " +
+ "(onPartitionsRevoked) will be called. " +
+ "Within that method, commits for unassigned partitions will be discarded. " +
+ "After that, touch() or poll() will trigger another commit attempt." +
+ "Those calls will occur in the near future if assigned partitions have pending commits." +
+ nextSetOfCommitsMap.entrySet().stream()
+ .map(kvp -> kvp.getKey() + "->" + kvp.getValue()).collect(Collectors.joining(",")))
+ .log();
+ }
+ }
+
+ String nextCommitsToString() {
+ return "nextCommits="+nextSetOfCommitsMap.entrySet().stream()
+ .map(kvp->kvp.getKey()+"->"+kvp.getValue()).collect(Collectors.joining(","));
+ }
+}
diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/TrafficStreamKeyWithKafkaRecordId.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/TrafficStreamKeyWithKafkaRecordId.java
new file mode 100644
index 000000000..aa86e96c7
--- /dev/null
+++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/TrafficStreamKeyWithKafkaRecordId.java
@@ -0,0 +1,27 @@
+package org.opensearch.migrations.replay.kafka;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import org.opensearch.migrations.replay.datatypes.PojoTrafficStreamKey;
+import org.opensearch.migrations.trafficcapture.protos.TrafficStream;
+
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@Getter
+class TrafficStreamKeyWithKafkaRecordId extends PojoTrafficStreamKey implements KafkaCommitOffsetData {
+ private final int generation;
+ private final int partition;
+ private final long offset;
+
+ TrafficStreamKeyWithKafkaRecordId(TrafficStream trafficStream, KafkaCommitOffsetData ok) {
+ this(trafficStream, ok.getGeneration(), ok.getPartition(), ok.getOffset());
+ }
+
+ TrafficStreamKeyWithKafkaRecordId(TrafficStream trafficStream, int generation, int partition, long offset) {
+ super(trafficStream);
+ this.generation = generation;
+ this.partition = partition;
+ this.offset = offset;
+ }
+}
diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/BlockingTrafficSource.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/BlockingTrafficSource.java
index a22ad2549..a84e7b80b 100644
--- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/BlockingTrafficSource.java
+++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/BlockingTrafficSource.java
@@ -15,6 +15,7 @@
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -40,9 +41,9 @@ public class BlockingTrafficSource implements ITrafficCaptureSource, BufferedFlo
* Limit the number of readers to one at a time and only if we haven't yet maxed out our time buffer
*/
private final Semaphore readGate;
-
private final Duration bufferTimeWindow;
+
public BlockingTrafficSource(ISimpleTrafficCaptureSource underlying,
Duration bufferTimeWindow) {
this.underlyingSource = underlying;
@@ -89,27 +90,13 @@ public Duration getBufferTimeWindow() {
@Override
public CompletableFuture>
readNextTrafficStreamChunk() {
- var trafficStreamListFuture =
- CompletableFuture.supplyAsync(() -> {
- if (stopReadingAtRef.get().equals(Instant.EPOCH)) { return null; }
- while (stopReadingAtRef.get().isBefore(lastTimestampSecondsRef.get())) {
- try {
- log.atInfo().setMessage(
- "blocking until signaled to read the next chunk last={} stop={}")
- .addArgument(lastTimestampSecondsRef.get())
- .addArgument(stopReadingAtRef.get())
- .log();
- readGate.acquire();
- } catch (InterruptedException e) {
- log.atWarn().setCause(e).log("Interrupted while waiting to read more data");
- Thread.currentThread().interrupt();
- break;
- }
- }
- return null;
- },
- task -> new Thread(task).start())
- .thenCompose(v->underlyingSource.readNextTrafficStreamChunk());
+ log.info("BlockingTrafficSource::readNext");
+ var trafficStreamListFuture = CompletableFuture
+ .supplyAsync(this::blockIfNeeded, task -> new Thread(task).start())
+ .thenCompose(v->{
+ log.info("BlockingTrafficSource::composing");
+ return underlyingSource.readNextTrafficStreamChunk();
+ });
return trafficStreamListFuture.whenComplete((v,t)->{
if (t != null) {
return;
@@ -126,6 +113,42 @@ public Duration getBufferTimeWindow() {
});
}
+ private Void blockIfNeeded() {
+ if (stopReadingAtRef.get().equals(Instant.EPOCH)) { return null; }
+ log.atInfo().setMessage(()->"stopReadingAtRef="+stopReadingAtRef+
+ " lastTimestampSecondsRef="+lastTimestampSecondsRef).log();
+ while (stopReadingAtRef.get().isBefore(lastTimestampSecondsRef.get())) {
+ try {
+ log.atInfo().setMessage("blocking until signaled to read the next chunk last={} stop={}")
+ .addArgument(lastTimestampSecondsRef.get())
+ .addArgument(stopReadingAtRef.get())
+ .log();
+ var nextTouchOp = underlyingSource.getNextRequiredTouch();
+ if (nextTouchOp.isEmpty()) {
+ readGate.acquire();
+ } else {
+ var nextInstant = nextTouchOp.get();
+ final var nowTime = Instant.now();
+ var waitIntervalMs = Duration.between(nowTime, nextInstant).toMillis();
+ log.atDebug().setMessage(()->"Next touch at " + nextInstant +
+ " ... in " + waitIntervalMs + "ms (now="+nowTime+")").log();
+ if (waitIntervalMs <= 0) {
+ underlyingSource.touch();
+ } else {
+ // if this doesn't succeed, we'll loop around & likely do a touch, then loop around again.
+ // if it DOES succeed, we'll loop around and make sure that there's not another reason to stop
+ readGate.tryAcquire(waitIntervalMs, TimeUnit.MILLISECONDS);
+ }
+ }
+ } catch (InterruptedException e) {
+ log.atWarn().setCause(e).log("Interrupted while waiting to read more data");
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ return null;
+ }
+
@Override
public void commitTrafficStream(ITrafficStreamKey trafficStreamKey) throws IOException {
underlyingSource.commitTrafficStream(trafficStreamKey);
diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/ITrafficCaptureSource.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/ITrafficCaptureSource.java
index 43838a2a5..625bde671 100644
--- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/ITrafficCaptureSource.java
+++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/ITrafficCaptureSource.java
@@ -4,7 +4,10 @@
import java.io.Closeable;
import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
public interface ITrafficCaptureSource extends Closeable {
@@ -14,4 +17,17 @@ public interface ITrafficCaptureSource extends Closeable {
default void commitTrafficStream(ITrafficStreamKey trafficStreamKey) throws IOException {}
default void close() throws IOException {}
+
+ /**
+ * Keep-alive call to be used by the BlockingTrafficSource to keep this connection alive if
+ * this is required.
+ */
+ default void touch() {}
+
+ /**
+ * @return The time that the next call to touch() must be completed for this source to stay
+ * active. Empty indicates that touch() does not need to be called to keep the
+ * source active.
+ */
+ default Optional getNextRequiredTouch() { return Optional.empty(); }
}
diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullTrafficReplayerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullTrafficReplayerTest.java
index a50d4c953..4e0f3765c 100644
--- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullTrafficReplayerTest.java
+++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullTrafficReplayerTest.java
@@ -80,7 +80,7 @@ public void testSingleStreamWithCloseIsCommitted() throws Throwable {
TrafficReplayerRunner.runReplayerUntilSourceWasExhausted(0,
httpServer.localhostEndpoint(), new IndexWatchingReceiverFactory(), trafficSourceSupplier);
Assertions.assertEquals(1, trafficSourceSupplier.nextReadCursor.get());
- log.error("done");
+ log.info("done");
}
@ParameterizedTest
@@ -104,7 +104,7 @@ public void fullTest(int testSize, boolean randomize) throws Throwable {
TrafficReplayerRunner.runReplayerUntilSourceWasExhausted(numExpectedRequests,
httpServer.localhostEndpoint(), new IndexWatchingReceiverFactory(), trafficSourceSupplier);
Assertions.assertEquals(trafficSourceSupplier.streams.size(), trafficSourceSupplier.nextReadCursor.get());
- log.error("done");
+ log.info("done");
}
@Getter
@@ -141,7 +141,7 @@ public ArrayCursorTrafficSourceFactory(List streams) {
public ISimpleTrafficCaptureSource get() {
var rval = new ArrayCursorTrafficCaptureSource(this);
- log.error("trafficSource="+rval+" readCursor="+rval.readCursor.get()+" nextReadCursor="+ nextReadCursor.get());
+ log.info("trafficSource="+rval+" readCursor="+rval.readCursor.get()+" nextReadCursor="+ nextReadCursor.get());
return rval;
}
}
diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/KafkaRestartingTrafficReplayerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/KafkaRestartingTrafficReplayerTest.java
index 34d28db72..8d55a6ffc 100644
--- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/KafkaRestartingTrafficReplayerTest.java
+++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/KafkaRestartingTrafficReplayerTest.java
@@ -11,10 +11,8 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
-import org.opensearch.migrations.replay.kafka.KafkaProtobufConsumer;
+import org.opensearch.migrations.replay.kafka.KafkaTrafficCaptureSource;
import org.opensearch.migrations.replay.traffic.source.ISimpleTrafficCaptureSource;
import org.opensearch.migrations.replay.traffic.source.ITrafficStreamWithKey;
import org.opensearch.migrations.replay.traffic.source.TrafficStreamWithEmbeddedKey;
@@ -50,10 +48,11 @@ public class KafkaRestartingTrafficReplayerTest {
public static final int PRODUCER_SLEEP_INTERVAL_MS = 100;
public static final Duration MAX_WAIT_TIME_FOR_TOPIC = Duration.ofMillis(PRODUCER_SLEEP_INTERVAL_MS*2);
+ public static final long DEFAULT_POLL_INTERVAL_MS = 5000;
@Container
// see https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-kafka-compatibility
- private KafkaContainer embeddedKafkaBroker =
+ private final KafkaContainer embeddedKafkaBroker =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.5.0"));
private static class CounterLimitedReceiverFactory implements Supplier> {
@@ -66,7 +65,7 @@ public Consumer get() {
var counter = new AtomicInteger();
return tuple -> {
if (counter.incrementAndGet() > stopPoint) {
- log.error("Request received after our ingest threshold. Throwing. Discarding " +
+ log.warn("Request received after our ingest threshold. Throwing. Discarding " +
tuple.uniqueRequestKey);
var nextStopPoint = stopPoint + new Random(stopPoint).nextInt(stopPoint + 1);
nextStopPointRef.compareAndSet(stopPoint, nextStopPoint);
@@ -90,7 +89,7 @@ public void fullTest(int testSize, boolean randomize) throws Throwable {
response->TestHttpServerContext.makeResponse(random, response));
var streamAndConsumer = TrafficStreamGenerator.generateStreamAndSumOfItsTransactions(testSize, randomize);
var trafficStreams = streamAndConsumer.stream.collect(Collectors.toList());
- log.atInfo().setMessage(()->trafficStreams.stream().map(ts-> TrafficStreamUtils.summarizeTrafficStream(ts))
+ log.atInfo().setMessage(()->trafficStreams.stream().map(TrafficStreamUtils::summarizeTrafficStream)
.collect(Collectors.joining("\n"))).log();
loadStreamsToKafka(buildKafkaConsumer(),
@@ -98,15 +97,16 @@ public void fullTest(int testSize, boolean randomize) throws Throwable {
TrafficReplayerRunner.runReplayerUntilSourceWasExhausted(streamAndConsumer.numHttpTransactions,
httpServer.localhostEndpoint(), new CounterLimitedReceiverFactory(),
() -> new SentinelSensingTrafficSource(
- new KafkaProtobufConsumer(buildKafkaConsumer(), TEST_TOPIC_NAME, null)));
- log.error("done");
+ new KafkaTrafficCaptureSource(buildKafkaConsumer(), TEST_TOPIC_NAME,
+ Duration.ofMillis(DEFAULT_POLL_INTERVAL_MS))));
+ log.info("done");
}
@SneakyThrows
private KafkaConsumer buildKafkaConsumer() {
- var kafkaConsumerProps = KafkaProtobufConsumer.buildKafkaProperties(embeddedKafkaBroker.getBootstrapServers(),
+ var kafkaConsumerProps = KafkaTrafficCaptureSource.buildKafkaProperties(embeddedKafkaBroker.getBootstrapServers(),
TEST_GROUP_CONSUMER_ID, false, null);
- kafkaConsumerProps.setProperty("max.poll.interval.ms", "5000");
+ kafkaConsumerProps.setProperty("max.poll.interval.ms", DEFAULT_POLL_INTERVAL_MS+"");
var kafkaConsumer = new KafkaConsumer(kafkaConsumerProps);
log.atInfo().setMessage(()->"Just built KafkaConsumer="+kafkaConsumer).log();
return kafkaConsumer;
@@ -174,7 +174,8 @@ Producer buildKafkaProducer() {
throw Lombok.sneakyThrow(e);
}
});
- return () -> new KafkaProtobufConsumer(kafkaConsumer, TEST_TOPIC_NAME, null);
+ return () -> new KafkaTrafficCaptureSource(kafkaConsumer, TEST_TOPIC_NAME,
+ Duration.ofMillis(DEFAULT_POLL_INTERVAL_MS));
}
@SneakyThrows
diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/SimpleCapturedTrafficToHttpTransactionAccumulatorTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/SimpleCapturedTrafficToHttpTransactionAccumulatorTest.java
index 1ec61749c..53b97d41c 100644
--- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/SimpleCapturedTrafficToHttpTransactionAccumulatorTest.java
+++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/SimpleCapturedTrafficToHttpTransactionAccumulatorTest.java
@@ -236,7 +236,7 @@ public void onConnectionClose(ISourceTrafficChannelKey key, int channelInteracti
static void assertReconstructedTransactionsMatchExpectations(List reconstructedTransactions,
int[] expectedRequestSizes,
int[] expectedResponseSizes) {
- log.error("reconstructedTransactions="+ reconstructedTransactions);
+ log.debug("reconstructedTransactions="+ reconstructedTransactions);
Assertions.assertEquals(expectedRequestSizes.length, reconstructedTransactions.size());
for (int i = 0; i< reconstructedTransactions.size(); ++i) {
Assertions.assertEquals((long) expectedRequestSizes[i],
diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonToByteBufHandlerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonToByteBufHandlerTest.java
index 818a53988..9da438ce4 100644
--- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonToByteBufHandlerTest.java
+++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonToByteBufHandlerTest.java
@@ -34,7 +34,7 @@ public class NettyJsonToByteBufHandlerTest {
@Test
public void testThatHttpContentsAreRepackagedToChunkSizeSpec() {
for (int i=0; i<10; ++i) {
- log.error("Testing w/ random seed="+i);
+ log.info("Testing w/ random seed="+i);
testWithSeed(new Random(i));
System.gc();
System.runFinalization();
@@ -90,7 +90,7 @@ private static List getByteBufSizesFromChannel(EmbeddedChannel channel)
static byte nonce = 0;
private int writeAndCheck(EmbeddedChannel channel, ArrayList sizesWrittenList, int len) {
var bytes = new byte[len];
- log.warn("Writing "+len);
+ log.debug("Writing "+len);
sizesWrittenList.add(len);
Arrays.fill(bytes, nonce++);
var httpContent = new DefaultHttpContent(Unpooled.wrappedBuffer(bytes));
diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaKeepAliveTests.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaKeepAliveTests.java
new file mode 100644
index 000000000..b2319765a
--- /dev/null
+++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaKeepAliveTests.java
@@ -0,0 +1,171 @@
+package org.opensearch.migrations.replay.kafka;
+
+import lombok.Lombok;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.opensearch.migrations.replay.datatypes.ITrafficStreamKey;
+import org.opensearch.migrations.replay.traffic.source.BlockingTrafficSource;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Slf4j
+@Testcontainers(disabledWithoutDocker = true)
+@Tag("requiresDocker")
+public class KafkaKeepAliveTests {
+ public static final String TEST_GROUP_CONSUMER_ID = "TEST_GROUP_CONSUMER_ID";
+ public static final String HEARTBEAT_INTERVAL_MS_KEY = "heartbeat.interval.ms";
+ public static final long MAX_POLL_INTERVAL_MS = 1000;
+ public static final long HEARTBEAT_INTERVAL_MS = 300;
+ public static final String testTopicName = "TEST_TOPIC";
+
+ Producer kafkaProducer;
+ AtomicInteger sendCompleteCount;
+ Properties kafkaProperties;
+ BlockingTrafficSource trafficSource;
+ ArrayList keysReceived;
+
+ @Container
+ // see https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-kafka-compatibility
+ private final KafkaContainer embeddedKafkaBroker =
+ new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.5.0"));
+ private KafkaTrafficCaptureSource kafkaSource;
+
+ /**
+ * Set up the test case where we've produced and received 1 message, but have not yet committed it.
+ * Another message is in the process of being produced.
+ * The BlockingTrafficSource is blocked on everything after a point before the beginning of the test.
+ * @throws Exception
+ */
+ @BeforeEach
+ private void setupTestCase() throws Exception {
+ kafkaProducer = KafkaTestUtils.buildKafkaProducer(embeddedKafkaBroker.getBootstrapServers());
+ this.sendCompleteCount = new AtomicInteger(0);
+ KafkaTestUtils.produceKafkaRecord(testTopicName, kafkaProducer, 0, sendCompleteCount).get();
+ Assertions.assertEquals(1, sendCompleteCount.get());
+
+ this.kafkaProperties = KafkaTrafficCaptureSource.buildKafkaProperties(embeddedKafkaBroker.getBootstrapServers(),
+ TEST_GROUP_CONSUMER_ID, false, null);
+ Assertions.assertNull(kafkaProperties.get(KafkaTrafficCaptureSource.MAX_POLL_INTERVAL_KEY));
+
+ kafkaProperties.put(KafkaTrafficCaptureSource.MAX_POLL_INTERVAL_KEY, MAX_POLL_INTERVAL_MS+"");
+ kafkaProperties.put(HEARTBEAT_INTERVAL_MS_KEY, HEARTBEAT_INTERVAL_MS+"");
+ kafkaProperties.put("max.poll.records", 1);
+ var kafkaConsumer = new KafkaConsumer(kafkaProperties);
+ this.kafkaSource = new KafkaTrafficCaptureSource(kafkaConsumer, testTopicName, Duration.ofMillis(MAX_POLL_INTERVAL_MS));
+ this.trafficSource = new BlockingTrafficSource(kafkaSource, Duration.ZERO);
+ this.keysReceived = new ArrayList<>();
+
+ readNextNStreams(trafficSource, keysReceived, 0, 1);
+ KafkaTestUtils.produceKafkaRecord(testTopicName, kafkaProducer, 1, sendCompleteCount);
+ }
+
+ @Test
+ @Tag("longTest")
+ public void testTimeoutsDontOccurForSlowPolls() throws Exception {
+ var pollIntervalMs = Optional.ofNullable(kafkaProperties.get(KafkaTrafficCaptureSource.MAX_POLL_INTERVAL_KEY))
+ .map(s->Integer.valueOf((String)s)).orElseThrow();
+ var executor = Executors.newSingleThreadScheduledExecutor();
+ executor.schedule(()-> {
+ try {
+ var k = keysReceived.get(0);
+ log.info("Calling commit traffic stream for "+k);
+ trafficSource.commitTrafficStream(k);
+ log.info("finished committing traffic stream");
+ log.info("Stop reads to infinity");
+ // this is a way to signal back to the main thread that this thread is done
+ KafkaTestUtils.produceKafkaRecord(testTopicName, kafkaProducer, 2, sendCompleteCount);
+ } catch (Exception e) {
+ throw Lombok.sneakyThrow(e);
+ }
+ },
+ pollIntervalMs, TimeUnit.MILLISECONDS);
+
+ // wait for 2 messages so that they include the last one produced by the async schedule call previously
+ readNextNStreams(trafficSource, keysReceived, 1, 2);
+ Assertions.assertEquals(3, keysReceived.size());
+ // At this point, we've read all (3) messages produced , committed the first one
+ // (all the way through to Kafka), and no commits are in-flight yet for the last two messages.
+ }
+
+ @Test
+ @Tag("longTest")
+ public void testBlockedReadsAndBrokenCommitsDontCauseReordering() throws Exception {
+ for (int i=0; i<2; ++i) {
+ KafkaTestUtils.produceKafkaRecord(testTopicName, kafkaProducer, 1 + i, sendCompleteCount).get();
+ }
+ readNextNStreams(trafficSource, keysReceived, 1, 1);
+
+ trafficSource.commitTrafficStream(keysReceived.get(0));
+ log.info("Called commitTrafficStream but waiting long enough for the client to leave the group. " +
+ "That will make the previous commit a 'zombie-commit' that should easily be dropped.");
+
+ log.info("1 message was committed, but not synced, 1 message is being processed." +
+ "wait long enough to fall out of the group before we can commit");
+ Thread.sleep(2*MAX_POLL_INTERVAL_MS);
+
+ var keysReceivedUntilDrop1 = keysReceived;
+ keysReceived = new ArrayList<>();
+
+ log.info("re-establish a client connection so that the following commit will work");
+ log.atInfo().setMessage(()->"1 ..."+renderNextCommitsAsString()).log();
+ readNextNStreams(trafficSource, keysReceived, 0, 1);
+ log.atInfo().setMessage(()->"2 ..."+renderNextCommitsAsString()).log();
+
+ log.info("wait long enough to fall out of the group again");
+ Thread.sleep(2*MAX_POLL_INTERVAL_MS);
+
+ var keysReceivedUntilDrop2 = keysReceived;
+ keysReceived = new ArrayList<>();
+ log.atInfo().setMessage(()->"re-establish... 3 ..."+renderNextCommitsAsString()).log();
+ readNextNStreams(trafficSource, keysReceived, 0, 1);
+ trafficSource.commitTrafficStream(keysReceivedUntilDrop1.get(1));
+ log.atInfo().setMessage(()->"re-establish... 4 ..."+renderNextCommitsAsString()).log();
+ readNextNStreams(trafficSource, keysReceived, 1, 1);
+ log.atInfo().setMessage(()->"5 ..."+renderNextCommitsAsString()).log();
+
+ Thread.sleep(2*MAX_POLL_INTERVAL_MS);
+ var keysReceivedUntilDrop3 = keysReceived;
+ keysReceived = new ArrayList<>();
+ readNextNStreams(trafficSource, keysReceived, 0, 3);
+ log.atInfo().setMessage(()->"6 ..."+kafkaSource.trackingKafkaConsumer.nextCommitsToString()).log();
+ trafficSource.close();
+ }
+
+ private String renderNextCommitsAsString() {
+ return kafkaSource.trackingKafkaConsumer.nextCommitsToString();
+ }
+
+ @SneakyThrows
+ private static void readNextNStreams(BlockingTrafficSource kafkaSource, List keysReceived,
+ int from, int count) {
+ Assertions.assertEquals(from, keysReceived.size());
+ for (int i=0; i{
+ var tsk = ts.getKey();
+ log.atInfo().setMessage(()->"checking for "+tsk).log();
+ Assertions.assertFalse(keysReceived.contains(tsk));
+ keysReceived.add(tsk);
+ });
+ log.info("Read "+trafficStreams.size()+" traffic streams");
+ i += trafficStreams.size();
+ }
+ }
+}
diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaProtobufConsumerLongTermTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaProtobufConsumerLongTermTest.java
deleted file mode 100644
index e8f478b04..000000000
--- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaProtobufConsumerLongTermTest.java
+++ /dev/null
@@ -1,156 +0,0 @@
-package org.opensearch.migrations.replay.kafka;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Timestamp;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.jetbrains.annotations.NotNull;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.opensearch.migrations.trafficcapture.protos.ReadObservation;
-import org.opensearch.migrations.trafficcapture.protos.TrafficObservation;
-import org.opensearch.migrations.trafficcapture.protos.TrafficStream;
-import org.testcontainers.containers.KafkaContainer;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
-import org.testcontainers.utility.DockerImageName;
-
-import java.nio.charset.StandardCharsets;
-import java.time.Instant;
-import java.util.Properties;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-@Slf4j
-@Testcontainers(disabledWithoutDocker = true)
-@Tag("requiresDocker")
-public class KafkaProtobufConsumerLongTermTest {
-
- public static final String TEST_GROUP_CONSUMER_ID = "TEST_GROUP_CONSUMER_ID";
- public static final String TEST_GROUP_PRODUCER_ID = "TEST_GROUP_PRODUCER_ID";
- public static final int TEST_RECORD_COUNT = 10;
- public static final String TEST_NODE_ID = "TestNodeId";
- public static final String TEST_TRAFFIC_STREAM_ID_STRING = "TEST_TRAFFIC_STREAM_ID_STRING";
- private static final String FAKE_READ_PACKET_DATA = "Fake pa";
- public static final int PRODUCER_SLEEP_INTERVAL_MS = 100;
- @Container
- // see https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-kafka-compatibility
- private KafkaContainer embeddedKafkaBroker =
- new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.5.0"));
-
-
- Producer buildKafkaProducer() {
- var kafkaProps = new Properties();
- kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
- kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- // Property details: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#delivery-timeout-ms
- kafkaProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 10000);
- kafkaProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
- kafkaProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
- kafkaProps.put(ProducerConfig.CLIENT_ID_CONFIG, TEST_GROUP_PRODUCER_ID);
- kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker.getBootstrapServers());
- try {
- return new KafkaProducer(kafkaProps);
- } catch (Exception e) {
- log.atError().setCause(e).log();
- System.exit(1);
- throw e;
- }
- }
-
- TrafficStream makeTestTrafficStream(Instant t, int i) {
- var timestamp = Timestamp.newBuilder()
- .setSeconds(t.getEpochSecond())
- .setNanos(t.getNano())
- .build();
- var tsb = TrafficStream.newBuilder()
- .setNumber(i);
- // TODO - add something for setNumberOfThisLastChunk. There's no point in doing that now though
- // because the code doesn't make any distinction between the very last one and the previous ones
- return tsb.setNodeId(TEST_NODE_ID)
- .setConnectionId(getConnectionId(i))
- .addSubStream(TrafficObservation.newBuilder().setTs(timestamp)
- .setRead(ReadObservation.newBuilder()
- .setData(ByteString.copyFrom(FAKE_READ_PACKET_DATA.getBytes(StandardCharsets.UTF_8)))
- .build())
- .build()).build();
-
- }
-
- private String getConnectionId(int i) {
- return TEST_TRAFFIC_STREAM_ID_STRING + "_" + i;
- }
-
- //@Test
- @Tag("longTest")
- public void testTrafficCaptureSource() throws Exception {
- String testTopicName = "TEST_TOPIC";
-
- var kafkaConsumerProps = KafkaProtobufConsumer.buildKafkaProperties(embeddedKafkaBroker.getBootstrapServers(),
- TEST_GROUP_CONSUMER_ID, false, null);
- kafkaConsumerProps.setProperty("max.poll.interval.ms", "10000");
- var kafkaConsumer = new KafkaConsumer(kafkaConsumerProps);
- var kafkaTrafficCaptureSource = new KafkaProtobufConsumer(kafkaConsumer, testTopicName, null);
-
- var kafkaProducer = buildKafkaProducer();
- var sendCompleteCount = new AtomicInteger(0);
- var scheduledIterationsCount = new AtomicInteger(0);
- var executor = Executors.newSingleThreadScheduledExecutor();
- executor.scheduleAtFixedRate(()->{
- var i = scheduledIterationsCount.getAndIncrement();
- if (i >= TEST_RECORD_COUNT) {
- executor.shutdown();
- } else {
- produceKafkaRecord(testTopicName, kafkaProducer, i, sendCompleteCount);
- }
- }, 0, PRODUCER_SLEEP_INTERVAL_MS, TimeUnit.MILLISECONDS);
-
- for (int i=0; i {
- var rogueChunk = kafkaTrafficCaptureSource.readNextTrafficStreamChunk().get(1, TimeUnit.SECONDS);
- if (rogueChunk.isEmpty()) {
- // TimeoutExceptions cannot be thrown by the supplier of the CompletableFuture today, BUT we
- // could long-poll on the broker for longer than the timeout value supplied in the get() call above
- throw new TimeoutException("read actually returned 0 items, but transforming this to a " +
- "TimeoutException because either result would be valid.");
- }
- log.error("rogue chunk: "+ rogueChunk);
- });
- }
-
- private long getSleepAmountMsForProducerRun(int i) {
- return 1*1000;
- }
-
- private void produceKafkaRecord(String testTopicName, Producer kafkaProducer, int i,
- AtomicInteger sendCompleteCount) {
- var trafficStream = makeTestTrafficStream(Instant.now(), i);
- var record = new ProducerRecord(testTopicName, makeKey(i), trafficStream.toByteArray());
- var sendFuture = kafkaProducer.send(record, (metadata, exception) -> {
- sendCompleteCount.incrementAndGet();
- });
- }
-
- @NotNull
- private static String makeKey(int i) {
- return "KEY_" + i;
- }
-}
\ No newline at end of file
diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTestUtils.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTestUtils.java
new file mode 100644
index 000000000..691439ff1
--- /dev/null
+++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTestUtils.java
@@ -0,0 +1,90 @@
+package org.opensearch.migrations.replay.kafka;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Tag;
+import org.opensearch.migrations.trafficcapture.protos.ReadObservation;
+import org.opensearch.migrations.trafficcapture.protos.TrafficObservation;
+import org.opensearch.migrations.trafficcapture.protos.TrafficStream;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Slf4j
+public class KafkaTestUtils {
+
+ public static final String TEST_GROUP_PRODUCER_ID = "TEST_GROUP_PRODUCER_ID";
+ private static final String FAKE_READ_PACKET_DATA = "Fake pa";
+ public static final String TEST_NODE_ID = "TestNodeId";
+ public static final String TEST_TRAFFIC_STREAM_ID_STRING = "TEST_TRAFFIC_STREAM_ID_STRING";
+
+ static Producer buildKafkaProducer(String bootstrapServers) {
+ var kafkaProps = new Properties();
+ kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ // Property details: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#delivery-timeout-ms
+ kafkaProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 10000);
+ kafkaProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
+ kafkaProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
+ kafkaProps.put(ProducerConfig.CLIENT_ID_CONFIG, TEST_GROUP_PRODUCER_ID);
+ kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ try {
+ return new KafkaProducer(kafkaProps);
+ } catch (Exception e) {
+ log.atError().setCause(e).log();
+ System.exit(1);
+ throw e;
+ }
+ }
+
+ static String getConnectionId(int i) {
+ return TEST_TRAFFIC_STREAM_ID_STRING + "_" + i;
+ }
+
+ static TrafficStream makeTestTrafficStream(Instant t, int i) {
+ var timestamp = Timestamp.newBuilder()
+ .setSeconds(t.plus(Duration.ofDays(i)).getEpochSecond())
+ .setNanos(t.getNano())
+ .build();
+ var tsb = TrafficStream.newBuilder()
+ .setNumber(i);
+ // TODO - add something for setNumberOfThisLastChunk. There's no point in doing that now though
+ // because the code doesn't make any distinction between the very last one and the previous ones
+ return tsb.setNodeId(TEST_NODE_ID)
+ .setConnectionId(getConnectionId(i))
+ .addSubStream(TrafficObservation.newBuilder().setTs(timestamp)
+ .setRead(ReadObservation.newBuilder()
+ .setData(ByteString.copyFrom(FAKE_READ_PACKET_DATA.getBytes(StandardCharsets.UTF_8)))
+ .build())
+ .build()).build();
+
+ }
+
+ static Future produceKafkaRecord(String testTopicName, Producer kafkaProducer,
+ int i, AtomicInteger sendCompleteCount) {
+ var trafficStream = KafkaTestUtils.makeTestTrafficStream(Instant.now(), i);
+ var record = new ProducerRecord(testTopicName, makeKey(i), trafficStream.toByteArray());
+ return kafkaProducer.send(record, (metadata, exception) -> {
+ sendCompleteCount.incrementAndGet();
+ });
+ }
+
+ @NotNull
+ private static String makeKey(int i) {
+ return "KEY_" + i;
+ }
+}
diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceLongTermTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceLongTermTest.java
new file mode 100644
index 000000000..40e15c712
--- /dev/null
+++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceLongTermTest.java
@@ -0,0 +1,86 @@
+package org.opensearch.migrations.replay.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Slf4j
+@Testcontainers(disabledWithoutDocker = true)
+@Tag("requiresDocker")
+public class KafkaTrafficCaptureSourceLongTermTest {
+
+ public static final int TEST_RECORD_COUNT = 10;
+ public static final String TEST_GROUP_CONSUMER_ID = "TEST_GROUP_CONSUMER_ID";
+ public static final int PRODUCER_SLEEP_INTERVAL_MS = 100;
+
+ @Container
+ // see https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-kafka-compatibility
+ private final KafkaContainer embeddedKafkaBroker =
+ new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.5.0"));
+
+
+ @Test
+ @Tag("longTest")
+ public void testTrafficCaptureSource() throws Exception {
+ String testTopicName = "TEST_TOPIC";
+
+ var kafkaConsumerProps = KafkaTrafficCaptureSource.buildKafkaProperties(embeddedKafkaBroker.getBootstrapServers(),
+ TEST_GROUP_CONSUMER_ID, false, null);
+ final long MAX_POLL_MS = 10000;
+ kafkaConsumerProps.setProperty(KafkaTrafficCaptureSource.MAX_POLL_INTERVAL_KEY, MAX_POLL_MS+"");
+ var kafkaConsumer = new KafkaConsumer(kafkaConsumerProps);
+ var kafkaTrafficCaptureSource = new KafkaTrafficCaptureSource(kafkaConsumer, testTopicName,
+ Duration.ofMillis(MAX_POLL_MS));
+
+ var kafkaProducer = KafkaTestUtils.buildKafkaProducer(embeddedKafkaBroker.getBootstrapServers());
+ var sendCompleteCount = new AtomicInteger(0);
+ var scheduledIterationsCount = new AtomicInteger(0);
+ var executor = Executors.newSingleThreadScheduledExecutor();
+ executor.scheduleAtFixedRate(()->{
+ var i = scheduledIterationsCount.getAndIncrement();
+ if (i >= TEST_RECORD_COUNT) {
+ executor.shutdown();
+ } else {
+ KafkaTestUtils.produceKafkaRecord(testTopicName, kafkaProducer, i, sendCompleteCount);
+ }
+ }, 0, PRODUCER_SLEEP_INTERVAL_MS, TimeUnit.MILLISECONDS);
+
+ for (int i=0; i {
+ var rogueChunk = kafkaTrafficCaptureSource.readNextTrafficStreamChunk().get(1, TimeUnit.SECONDS);
+ if (rogueChunk.isEmpty()) {
+ // TimeoutExceptions cannot be thrown by the supplier of the CompletableFuture today, BUT we
+ // could long-poll on the broker for longer than the timeout value supplied in the get() call above
+ throw new TimeoutException("read actually returned 0 items, but transforming this to a " +
+ "TimeoutException because either result would be valid.");
+ }
+ log.error("rogue chunk: "+ rogueChunk);
+ });
+ }
+
+ private long getSleepAmountMsForProducerRun(int i) {
+ return 1*1000;
+ }
+}
\ No newline at end of file
diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaProtobufConsumerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java
similarity index 94%
rename from TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaProtobufConsumerTest.java
rename to TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java
index 8ae5c46d3..794da7768 100644
--- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaProtobufConsumerTest.java
+++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java
@@ -29,10 +29,9 @@
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
-import java.util.stream.Stream;
@Slf4j
-class KafkaProtobufConsumerTest {
+class KafkaTrafficCaptureSourceTest {
public static final int NUM_READ_ITEMS_BOUND = 1000;
public static final String TEST_TOPIC_NAME = "TEST_TOPIC_NAME";
@@ -40,7 +39,8 @@ class KafkaProtobufConsumerTest {
public void testSupplyTrafficFromSource() {
int numTrafficStreams = 10;
MockConsumer mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
- KafkaProtobufConsumer protobufConsumer = new KafkaProtobufConsumer(mockConsumer, TEST_TOPIC_NAME);
+ KafkaTrafficCaptureSource protobufConsumer = new KafkaTrafficCaptureSource(mockConsumer, TEST_TOPIC_NAME,
+ Duration.ofHours(1));
initializeMockConsumerTopic(mockConsumer);
List substreamCounts = new ArrayList<>();
@@ -80,7 +80,8 @@ public void testSupplyTrafficFromSource() {
public void testSupplyTrafficWithUnformattedMessages() {
int numTrafficStreams = 10;
MockConsumer mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
- KafkaProtobufConsumer protobufConsumer = new KafkaProtobufConsumer(mockConsumer, TEST_TOPIC_NAME);
+ KafkaTrafficCaptureSource protobufConsumer = new KafkaTrafficCaptureSource(mockConsumer, TEST_TOPIC_NAME,
+ Duration.ofHours(1));
initializeMockConsumerTopic(mockConsumer);
List substreamCounts = new ArrayList<>();
@@ -129,7 +130,7 @@ public void testSupplyTrafficWithUnformattedMessages() {
@Test
public void testBuildPropertiesBaseCase() throws IOException {
- Properties props = KafkaProtobufConsumer.buildKafkaProperties("brokers", "groupId", false, null);
+ Properties props = KafkaTrafficCaptureSource.buildKafkaProperties("brokers", "groupId", false, null);
Assertions.assertEquals("brokers", props.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
Assertions.assertEquals("org.apache.kafka.common.serialization.StringDeserializer", props.get("key.deserializer"));
Assertions.assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", props.get("value.deserializer"));
@@ -139,7 +140,7 @@ public void testBuildPropertiesBaseCase() throws IOException {
@Test
public void testBuildPropertiesMSKAuthEnabled() throws IOException {
- Properties props = KafkaProtobufConsumer.buildKafkaProperties("brokers", "groupId", true, null);
+ Properties props = KafkaTrafficCaptureSource.buildKafkaProperties("brokers", "groupId", true, null);
Assertions.assertEquals("brokers", props.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
Assertions.assertEquals("org.apache.kafka.common.serialization.StringDeserializer", props.get("key.deserializer"));
Assertions.assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", props.get("value.deserializer"));
@@ -154,7 +155,7 @@ public void testBuildPropertiesMSKAuthEnabled() throws IOException {
@Test
public void testBuildPropertiesWithProvidedPropertyFile() throws IOException {
File simplePropertiesFile = new File("src/test/resources/kafka/simple-kafka.properties");
- Properties props = KafkaProtobufConsumer.buildKafkaProperties("brokers", "groupId", true, simplePropertiesFile.getPath());
+ Properties props = KafkaTrafficCaptureSource.buildKafkaProperties("brokers", "groupId", true, simplePropertiesFile.getPath());
Assertions.assertEquals("brokers", props.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
Assertions.assertEquals("org.apache.kafka.common.serialization.StringDeserializer", props.get("key.deserializer"));
Assertions.assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", props.get("value.deserializer"));
diff --git a/TrafficCapture/trafficReplayer/src/test/resources/log4j2.properties b/TrafficCapture/trafficReplayer/src/test/resources/log4j2.properties
index 52d5190a1..9098da413 100644
--- a/TrafficCapture/trafficReplayer/src/test/resources/log4j2.properties
+++ b/TrafficCapture/trafficReplayer/src/test/resources/log4j2.properties
@@ -14,3 +14,7 @@ appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS}{UTC} %p %c{1.} [%t
# of the logs for tests
logger.OutputTupleJsonLogger.name = OutputTupleJsonLogger
logger.OutputTupleJsonLogger.level = OFF
+
+logger.KPC.name = org.opensearch.migrations.replay.kafka.KafkaProtobufConsumer
+logger.KPC.level = DEBUG
+logger.KPC.appenderRef.stdout.ref = Console