Skip to content
This repository has been archived by the owner on Apr 11, 2024. It is now read-only.

Commit

Permalink
Merge pull request opensearch-project#456 from gregschohn/BlockingTra…
Browse files Browse the repository at this point in the history
…fficSourceWithKeepAlives

Blocking traffic source with keep alives
  • Loading branch information
gregschohn authored Nov 22, 2023
2 parents bdeded2 + 45cfa9e commit f70e914
Show file tree
Hide file tree
Showing 24 changed files with 1,005 additions and 496 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public CodedOutputStreamAndByteBufferWrapper createStream() {

cos.flush();
byteBufferAtomicReference.set(osh.getByteBuffer().flip().asReadOnlyBuffer());
log.error("byteBufferAtomicReference.get="+byteBufferAtomicReference.get());
log.trace("byteBufferAtomicReference.get="+byteBufferAtomicReference.get());

return CompletableFuture.completedFuture(flushCount.incrementAndGet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void testHttpTransform() throws IOException {
.build();
var transformedDocument = transformer.transformJson(documentJson);
String transformedJsonOutputStr = emitJson(transformedDocument);
log.error("transformed json document: "+transformedJsonOutputStr);
log.info("transformed json document: "+transformedJsonOutputStr);
Assertions.assertTrue(transformedJsonOutputStr.contains(DUMMY_HOSTNAME_TEST_STRING));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ appender.console.target = SYSTEM_ERR
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss,SSS}{UTC} [%t] %c{1} - %msg%equals{ ctx=%mdc}{ ctx=\{\}}{}%n

rootLogger.level = trace
rootLogger.level = debug
rootLogger.appenderRefs = stderr
rootLogger.appenderRef.stderr.ref = STDERR
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

import lombok.extern.slf4j.Slf4j;
import org.opensearch.migrations.replay.kafka.KafkaBehavioralPolicy;
import org.opensearch.migrations.replay.kafka.KafkaProtobufConsumer;
import org.opensearch.migrations.replay.kafka.KafkaTrafficCaptureSource;
import org.opensearch.migrations.replay.traffic.source.BlockingTrafficSource;
import org.opensearch.migrations.replay.traffic.source.ISimpleTrafficCaptureSource;
import org.opensearch.migrations.replay.traffic.source.InputStreamOfTraffic;

import java.io.FileInputStream;
import java.io.IOException;
import java.time.Clock;
import java.time.Duration;

@Slf4j
Expand All @@ -31,8 +32,9 @@ private TrafficCaptureSourceFactory() {}
}

if (isKafkaActive) {
return KafkaProtobufConsumer.buildKafkaConsumer(appParams.kafkaTrafficBrokers, appParams.kafkaTrafficTopic,
appParams.kafkaTrafficGroupId, appParams.kafkaTrafficEnableMSKAuth, appParams.kafkaTrafficPropertyFile, new KafkaBehavioralPolicy());
return KafkaTrafficCaptureSource.buildKafkaConsumer(appParams.kafkaTrafficBrokers, appParams.kafkaTrafficTopic,
appParams.kafkaTrafficGroupId, appParams.kafkaTrafficEnableMSKAuth, appParams.kafkaTrafficPropertyFile,
Clock.systemUTC(), new KafkaBehavioralPolicy());
} else if (isInputFileActive) {
return new InputStreamOfTraffic(new FileInputStream(appParams.inputFilename));
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package org.opensearch.migrations.replay.datatypes;

import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;
import org.opensearch.migrations.trafficcapture.protos.TrafficStreamUtils;

@ToString
@EqualsAndHashCode()
public class PojoTrafficStreamKey implements ITrafficStreamKey {
private final String nodeId;
private final String connectionId;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.opensearch.migrations.replay.kafka;

public interface KafkaCommitOffsetData {
int getPartition();
long getOffset();
int getGeneration();

}

This file was deleted.

Loading

0 comments on commit f70e914

Please sign in to comment.