Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the S3 sink integration tests combinations #3199

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferTypeOptions;

import static org.opensearch.dataprepper.plugins.sink.s3.S3SinkIT.MEDIUM_OBJECT_SIZE;

public class InMemoryBufferScenario implements BufferScenario {
@Override
public BufferTypeOptions getBufferType() {
Expand All @@ -17,6 +15,6 @@ public BufferTypeOptions getBufferType() {

@Override
public int getMaximumNumberOfEvents() {
return MEDIUM_OBJECT_SIZE;
return SizeCombination.MEDIUM_LARGER.getTotalSize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferTypeOptions;

import static org.opensearch.dataprepper.plugins.sink.s3.S3SinkIT.LARGE_OBJECT_SIZE;

public class LocalFileBufferScenario implements BufferScenario {
@Override
public BufferTypeOptions getBufferType() {
Expand All @@ -17,6 +15,6 @@ public BufferTypeOptions getBufferType() {

@Override
public int getMaximumNumberOfEvents() {
return LARGE_OBJECT_SIZE;
return SizeCombination.LARGE.getTotalSize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferTypeOptions;

import static org.opensearch.dataprepper.plugins.sink.s3.S3SinkIT.LARGE_OBJECT_SIZE;

public class MultiPartBufferScenario implements BufferScenario {
@Override
public BufferTypeOptions getBufferType() {
Expand All @@ -17,6 +15,6 @@ public BufferTypeOptions getBufferType() {

@Override
public int getMaximumNumberOfEvents() {
return LARGE_OBJECT_SIZE;
return SizeCombination.LARGE.getTotalSize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@ public class S3SinkIT {
private static final Logger LOG = LoggerFactory.getLogger(S3SinkIT.class);
private static final Random RANDOM = new Random();

static final int MEDIUM_OBJECT_SIZE = 50 * 500;
static final int LARGE_OBJECT_SIZE = 500 * 2_000;

private static List<String> reusableRandomStrings;

@Mock
Expand Down Expand Up @@ -176,27 +173,31 @@ private S3Sink createObjectUnderTest() {
}

@ParameterizedTest
@ArgumentsSource(IntegrationTestArguments.class)
void test(final OutputScenario outputScenario, final BufferTypeOptions bufferTypeOptions, final CompressionScenario compressionScenario, final int batchSize, final int numberOfBatches) throws IOException {

String testRun = outputScenario + "-" + bufferTypeOptions + "-" + compressionScenario + "-" + batchSize + "-" + numberOfBatches;
@ArgumentsSource(BufferCombinationsArguments.class)
@ArgumentsSource(CodecArguments.class)
void test(final OutputScenario outputScenario,
final BufferTypeOptions bufferTypeOptions,
final CompressionScenario compressionScenario,
final SizeCombination sizeCombination) throws IOException {

String testRun = outputScenario + "-" + bufferTypeOptions + "-" + compressionScenario + "-" + sizeCombination.getBatchSize() + "-" + sizeCombination.getNumberOfBatches();
final String pathPrefix = pathPrefixForTestSuite + testRun;
when(objectKeyOptions.getPathPrefix()).thenReturn(pathPrefix + "/");

when(pluginFactory.loadPlugin(eq(OutputCodec.class), any())).thenReturn(outputScenario.getCodec());
when(s3SinkConfig.getBufferType()).thenReturn(bufferTypeOptions);
when(s3SinkConfig.getCompression()).thenReturn(compressionScenario.getCompressionOption());
int expectedTotalSize = batchSize * numberOfBatches;
int expectedTotalSize = sizeCombination.getTotalSize();
when(thresholdOptions.getEventCount()).thenReturn(expectedTotalSize);

final S3Sink objectUnderTest = createObjectUnderTest();

final int maxEventDataToSample = 5000;
final int maxEventDataToSample = 2000;
final List<Map<String, Object>> sampleEventData = new ArrayList<>(maxEventDataToSample);
for (int batchNumber = 0; batchNumber < numberOfBatches; batchNumber++) {
for (int batchNumber = 0; batchNumber < sizeCombination.getNumberOfBatches(); batchNumber++) {
final int currentBatchNumber = batchNumber;
final List<Record<Event>> events = IntStream.range(0, batchSize)
.mapToObj(sequence -> generateEventData((currentBatchNumber+1) * (sequence+1)))
final List<Record<Event>> events = IntStream.range(0, sizeCombination.getBatchSize())
.mapToObj(sequence -> generateEventData((currentBatchNumber + 1) * (sequence + 1)))
.peek(data -> {
if (sampleEventData.size() < maxEventDataToSample)
sampleEventData.add(data);
Expand All @@ -213,7 +214,7 @@ void test(final OutputScenario outputScenario, final BufferTypeOptions bufferTyp

final ListObjectsV2Response listObjectsResponse = s3Client.listObjectsV2(ListObjectsV2Request.builder()
.bucket(bucketName)
.prefix(pathPrefix)
.prefix(pathPrefix + "/")
.build());

assertThat(listObjectsResponse.contents(), notNullValue());
Expand Down Expand Up @@ -281,7 +282,13 @@ private Map<String, Object> generateEventData(final int sequence) {
return eventDataMap;
}

static class IntegrationTestArguments implements ArgumentsProvider {

/**
* These tests focus on various size combinations for various buffers.
* It should cover all the buffers with some different size combinations.
* But, only needs a sample of codecs.
*/
static class BufferCombinationsArguments implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
final List<BufferScenario> bufferScenarios = List.of(
Expand All @@ -298,33 +305,60 @@ public Stream<? extends Arguments> provideArguments(final ExtensionContext conte
new GZipCompressionScenario(),
new SnappyCompressionScenario()
);
final List<Integer> numberOfRecordsPerBatchList = List.of(
1,
500
final List<SizeCombination> sizeCombinations = List.of(
SizeCombination.EXACTLY_ONE,
SizeCombination.MEDIUM_SMALLER,
SizeCombination.LARGE
);

return generateCombinedArguments(bufferScenarios, outputScenarios, compressionScenarios, sizeCombinations);
}
}

/**
* Should test all codecs. It only varies some other conditions slightly.
*/
static class CodecArguments implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
final List<BufferScenario> bufferScenarios = List.of(
new InMemoryBufferScenario(),
new MultiPartBufferScenario()
);
final List<OutputScenario> outputScenarios = List.of(
new NdjsonOutputScenario()
);
final List<CompressionScenario> compressionScenarios = List.of(
new NoneCompressionScenario(),
new GZipCompressionScenario()
);
final List<Integer> numberOfBatchesList = List.of(
1,
50,
1_000
final List<SizeCombination> sizeCombinations = List.of(
SizeCombination.MEDIUM_LARGER
);

return outputScenarios
.stream()
.flatMap(outputScenario -> bufferScenarios
.stream()
.filter(bufferScenario -> !outputScenario.getIncompatibleBufferTypes().contains(bufferScenario.getBufferType()))
.flatMap(bufferScenario -> compressionScenarios
.stream()
.flatMap(compressionScenario -> numberOfRecordsPerBatchList
.stream()
.flatMap(batchRecordCount -> numberOfBatchesList
.stream()
.filter(batchCount -> batchCount * batchRecordCount <= bufferScenario.getMaximumNumberOfEvents())
.map(batchCount -> arguments(outputScenario, bufferScenario.getBufferType(), compressionScenario, batchRecordCount, batchCount))
))));
return generateCombinedArguments(bufferScenarios, outputScenarios, compressionScenarios, sizeCombinations);
}
}

private static Stream<? extends Arguments> generateCombinedArguments(
final List<BufferScenario> bufferScenarios,
final List<OutputScenario> outputScenarios,
final List<CompressionScenario> compressionScenarios,
final List<SizeCombination> sizeCombinations) {
return outputScenarios
.stream()
.flatMap(outputScenario -> bufferScenarios
.stream()
.filter(bufferScenario -> !outputScenario.getIncompatibleBufferTypes().contains(bufferScenario.getBufferType()))
.flatMap(bufferScenario -> compressionScenarios
.stream()
.flatMap(compressionScenario -> sizeCombinations
.stream()
.filter(sizeCombination -> sizeCombination.getTotalSize() <= bufferScenario.getMaximumNumberOfEvents())
.map(sizeCombination -> arguments(outputScenario, bufferScenario.getBufferType(), compressionScenario, sizeCombination))
)));
}

private static String reusableRandomString() {
return reusableRandomStrings.get(RANDOM.nextInt(reusableRandomStrings.size()));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.opensearch.dataprepper.plugins.sink.s3;

/**
* Represents common size combinations.
*/
final class SizeCombination {
static final SizeCombination EXACTLY_ONE = new SizeCombination(1, 1);
static final SizeCombination MEDIUM_SMALLER = new SizeCombination(100, 10);
static final SizeCombination MEDIUM_LARGER = new SizeCombination(500, 50);
static final SizeCombination LARGE = new SizeCombination(500, 500);

private final int batchSize;

private final int numberOfBatches;

private SizeCombination(int batchSize, int numberOfBatches) {
this.batchSize = batchSize;
this.numberOfBatches = numberOfBatches;
}

int getTotalSize() {
return batchSize * numberOfBatches;
}

public int getBatchSize() {
return batchSize;
}

public int getNumberOfBatches() {
return numberOfBatches;
}
}
Loading