diff --git a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/InMemoryBufferScenario.java b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/InMemoryBufferScenario.java index db58354dbe..564d1da3a3 100644 --- a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/InMemoryBufferScenario.java +++ b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/InMemoryBufferScenario.java @@ -7,8 +7,6 @@ import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferTypeOptions; -import static org.opensearch.dataprepper.plugins.sink.s3.S3SinkIT.MEDIUM_OBJECT_SIZE; - public class InMemoryBufferScenario implements BufferScenario { @Override public BufferTypeOptions getBufferType() { @@ -17,6 +15,6 @@ public BufferTypeOptions getBufferType() { @Override public int getMaximumNumberOfEvents() { - return MEDIUM_OBJECT_SIZE; + return SizeCombination.MEDIUM_LARGER.getTotalSize(); } } diff --git a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/LocalFileBufferScenario.java b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/LocalFileBufferScenario.java index f6c45d656f..3c0d37c825 100644 --- a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/LocalFileBufferScenario.java +++ b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/LocalFileBufferScenario.java @@ -7,8 +7,6 @@ import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferTypeOptions; -import static org.opensearch.dataprepper.plugins.sink.s3.S3SinkIT.LARGE_OBJECT_SIZE; - public class LocalFileBufferScenario implements BufferScenario { @Override public BufferTypeOptions getBufferType() { @@ -17,6 +15,6 @@ public BufferTypeOptions getBufferType() { @Override public int getMaximumNumberOfEvents() { - return LARGE_OBJECT_SIZE; + return SizeCombination.LARGE.getTotalSize(); } } diff --git a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/MultiPartBufferScenario.java b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/MultiPartBufferScenario.java index 3d26b3dc82..b0ec0c570f 100644 --- a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/MultiPartBufferScenario.java +++ b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/MultiPartBufferScenario.java @@ -7,8 +7,6 @@ import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferTypeOptions; -import static org.opensearch.dataprepper.plugins.sink.s3.S3SinkIT.LARGE_OBJECT_SIZE; - public class MultiPartBufferScenario implements BufferScenario { @Override public BufferTypeOptions getBufferType() { @@ -17,6 +15,6 @@ public BufferTypeOptions getBufferType() { @Override public int getMaximumNumberOfEvents() { - return LARGE_OBJECT_SIZE; + return SizeCombination.LARGE.getTotalSize(); } } diff --git a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkIT.java b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkIT.java index dfb9b18a07..be56da349a 100644 --- a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkIT.java +++ b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkIT.java @@ -85,9 +85,6 @@ public class S3SinkIT { private static final Logger LOG = LoggerFactory.getLogger(S3SinkIT.class); private static final Random RANDOM = new Random(); - static final int MEDIUM_OBJECT_SIZE = 50 * 500; - static final int LARGE_OBJECT_SIZE = 500 * 2_000; - private static List reusableRandomStrings; @Mock @@ -176,27 +173,31 @@ private S3Sink createObjectUnderTest() { } @ParameterizedTest - @ArgumentsSource(IntegrationTestArguments.class) - void test(final OutputScenario outputScenario, final BufferTypeOptions bufferTypeOptions, final CompressionScenario compressionScenario, final int batchSize, final int numberOfBatches) throws IOException { - - String testRun = outputScenario + "-" + bufferTypeOptions + "-" + compressionScenario + "-" + batchSize + "-" + numberOfBatches; + @ArgumentsSource(BufferCombinationsArguments.class) + @ArgumentsSource(CodecArguments.class) + void test(final OutputScenario outputScenario, + final BufferTypeOptions bufferTypeOptions, + final CompressionScenario compressionScenario, + final SizeCombination sizeCombination) throws IOException { + + String testRun = outputScenario + "-" + bufferTypeOptions + "-" + compressionScenario + "-" + sizeCombination.getBatchSize() + "-" + sizeCombination.getNumberOfBatches(); final String pathPrefix = pathPrefixForTestSuite + testRun; when(objectKeyOptions.getPathPrefix()).thenReturn(pathPrefix + "/"); when(pluginFactory.loadPlugin(eq(OutputCodec.class), any())).thenReturn(outputScenario.getCodec()); when(s3SinkConfig.getBufferType()).thenReturn(bufferTypeOptions); when(s3SinkConfig.getCompression()).thenReturn(compressionScenario.getCompressionOption()); - int expectedTotalSize = batchSize * numberOfBatches; + int expectedTotalSize = sizeCombination.getTotalSize(); when(thresholdOptions.getEventCount()).thenReturn(expectedTotalSize); final S3Sink objectUnderTest = createObjectUnderTest(); - final int maxEventDataToSample = 5000; + final int maxEventDataToSample = 2000; final List> sampleEventData = new ArrayList<>(maxEventDataToSample); - for (int batchNumber = 0; batchNumber < numberOfBatches; batchNumber++) { + for (int batchNumber = 0; batchNumber < sizeCombination.getNumberOfBatches(); batchNumber++) { final int currentBatchNumber = batchNumber; - final List> events = IntStream.range(0, batchSize) - .mapToObj(sequence -> generateEventData((currentBatchNumber+1) * (sequence+1))) + final List> events = IntStream.range(0, sizeCombination.getBatchSize()) + .mapToObj(sequence -> generateEventData((currentBatchNumber + 1) * (sequence + 1))) .peek(data -> { if (sampleEventData.size() < maxEventDataToSample) sampleEventData.add(data); @@ -213,7 +214,7 @@ void test(final OutputScenario outputScenario, final BufferTypeOptions bufferTyp final ListObjectsV2Response listObjectsResponse = s3Client.listObjectsV2(ListObjectsV2Request.builder() .bucket(bucketName) - .prefix(pathPrefix) + .prefix(pathPrefix + "/") .build()); assertThat(listObjectsResponse.contents(), notNullValue()); @@ -281,7 +282,13 @@ private Map generateEventData(final int sequence) { return eventDataMap; } - static class IntegrationTestArguments implements ArgumentsProvider { + + /** + * These tests focus on various size combinations for various buffers. + * It should cover all the buffers with some different size combinations. + * But, only needs a sample of codecs. + */ + static class BufferCombinationsArguments implements ArgumentsProvider { @Override public Stream provideArguments(final ExtensionContext context) { final List bufferScenarios = List.of( @@ -298,33 +305,60 @@ public Stream provideArguments(final ExtensionContext conte new GZipCompressionScenario(), new SnappyCompressionScenario() ); - final List numberOfRecordsPerBatchList = List.of( - 1, - 500 + final List sizeCombinations = List.of( + SizeCombination.EXACTLY_ONE, + SizeCombination.MEDIUM_SMALLER, + SizeCombination.LARGE + ); + + return generateCombinedArguments(bufferScenarios, outputScenarios, compressionScenarios, sizeCombinations); + } + } + + /** + * Should test all codecs. It only varies some other conditions slightly. + */ + static class CodecArguments implements ArgumentsProvider { + @Override + public Stream provideArguments(final ExtensionContext context) { + final List bufferScenarios = List.of( + new InMemoryBufferScenario(), + new MultiPartBufferScenario() + ); + final List outputScenarios = List.of( + new NdjsonOutputScenario() + ); + final List compressionScenarios = List.of( + new NoneCompressionScenario(), + new GZipCompressionScenario() ); - final List numberOfBatchesList = List.of( - 1, - 50, - 1_000 + final List sizeCombinations = List.of( + SizeCombination.MEDIUM_LARGER ); - return outputScenarios - .stream() - .flatMap(outputScenario -> bufferScenarios - .stream() - .filter(bufferScenario -> !outputScenario.getIncompatibleBufferTypes().contains(bufferScenario.getBufferType())) - .flatMap(bufferScenario -> compressionScenarios - .stream() - .flatMap(compressionScenario -> numberOfRecordsPerBatchList - .stream() - .flatMap(batchRecordCount -> numberOfBatchesList - .stream() - .filter(batchCount -> batchCount * batchRecordCount <= bufferScenario.getMaximumNumberOfEvents()) - .map(batchCount -> arguments(outputScenario, bufferScenario.getBufferType(), compressionScenario, batchRecordCount, batchCount)) - )))); + return generateCombinedArguments(bufferScenarios, outputScenarios, compressionScenarios, sizeCombinations); } } + private static Stream generateCombinedArguments( + final List bufferScenarios, + final List outputScenarios, + final List compressionScenarios, + final List sizeCombinations) { + return outputScenarios + .stream() + .flatMap(outputScenario -> bufferScenarios + .stream() + .filter(bufferScenario -> !outputScenario.getIncompatibleBufferTypes().contains(bufferScenario.getBufferType())) + .flatMap(bufferScenario -> compressionScenarios + .stream() + .flatMap(compressionScenario -> sizeCombinations + .stream() + .filter(sizeCombination -> sizeCombination.getTotalSize() <= bufferScenario.getMaximumNumberOfEvents()) + .map(sizeCombination -> arguments(outputScenario, bufferScenario.getBufferType(), compressionScenario, sizeCombination)) + ))); + } + private static String reusableRandomString() { return reusableRandomStrings.get(RANDOM.nextInt(reusableRandomStrings.size())); } diff --git a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/SizeCombination.java b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/SizeCombination.java new file mode 100644 index 0000000000..e49942577b --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/SizeCombination.java @@ -0,0 +1,32 @@ +package org.opensearch.dataprepper.plugins.sink.s3; + +/** + * Represents common size combinations. + */ +final class SizeCombination { + static final SizeCombination EXACTLY_ONE = new SizeCombination(1, 1); + static final SizeCombination MEDIUM_SMALLER = new SizeCombination(100, 10); + static final SizeCombination MEDIUM_LARGER = new SizeCombination(500, 50); + static final SizeCombination LARGE = new SizeCombination(500, 500); + + private final int batchSize; + + private final int numberOfBatches; + + private SizeCombination(int batchSize, int numberOfBatches) { + this.batchSize = batchSize; + this.numberOfBatches = numberOfBatches; + } + + int getTotalSize() { + return batchSize * numberOfBatches; + } + + public int getBatchSize() { + return batchSize; + } + + public int getNumberOfBatches() { + return numberOfBatches; + } +}