From ffa8c4a6a8a961a1a6ba06959db8707bd399954b Mon Sep 17 00:00:00 2001 From: David Venable Date: Tue, 22 Aug 2023 07:05:30 -0700 Subject: [PATCH] Updates to the S3 sink to speed up the unit test time. There are a few major changes - use the Duration class instead of a nebulous long to have millisecond options and clarity; inject the retry sleep time so that the tests can sleep for shorter time; using mocking where possible to avoid unnecessary sleeps. (#3203) Signed-off-by: David Venable --- .../plugins/sink/s3/S3SinkServiceIT.java | 2 +- .../dataprepper/plugins/sink/s3/S3Sink.java | 3 +- .../plugins/sink/s3/S3SinkService.java | 12 +- .../plugins/sink/s3/ThresholdCheck.java | 8 +- .../plugins/sink/s3/accumulator/Buffer.java | 3 +- .../s3/accumulator/CompressionBuffer.java | 3 +- .../sink/s3/accumulator/InMemoryBuffer.java | 5 +- .../sink/s3/accumulator/LocalFileBuffer.java | 5 +- .../sink/s3/accumulator/MultipartBuffer.java | 5 +- .../plugins/sink/s3/S3SinkServiceTest.java | 2 +- .../plugins/sink/s3/ThresholdCheckTest.java | 150 ++++++++---------- .../s3/accumulator/CompressionBufferTest.java | 3 +- .../s3/accumulator/InMemoryBufferTest.java | 38 ++++- .../s3/accumulator/LocalFileBufferTest.java | 11 +- 14 files changed, 136 insertions(+), 114 deletions(-) diff --git a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java index 562194e8cd..0aaa492abc 100644 --- a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java +++ b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java @@ -242,7 +242,7 @@ void verify_flushed_records_into_s3_bucketNewLine_with_compression() throws IOEx private S3SinkService createObjectUnderTest() { OutputCodecContext codecContext = new OutputCodecContext("Tag", Collections.emptyList(), Collections.emptyList()); - return new S3SinkService(s3SinkConfig, bufferFactory, codec, codecContext, s3Client, keyGenerator, pluginMetrics); + return new S3SinkService(s3SinkConfig, bufferFactory, codec, codecContext, s3Client, keyGenerator, Duration.ofSeconds(5), pluginMetrics); } private int gets3ObjectCount() { diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java index bb03f8be2e..0249fbe0ba 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.S3Client; +import java.time.Duration; import java.util.Collection; /** @@ -75,7 +76,7 @@ public S3Sink(final PluginSetting pluginSetting, S3OutputCodecContext s3OutputCodecContext = new S3OutputCodecContext(OutputCodecContext.fromSinkContext(sinkContext), compressionOption); - s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, s3OutputCodecContext, s3Client, keyGenerator, pluginMetrics); + s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, s3OutputCodecContext, s3Client, keyGenerator, Duration.ofSeconds(5), pluginMetrics); } @Override diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java index 7007259ebf..4ac43721f1 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java @@ -23,6 +23,7 @@ import software.amazon.awssdk.services.s3.S3Client; import java.io.IOException; +import java.time.Duration; import java.util.Collection; import java.util.LinkedList; import java.util.concurrent.locks.Lock; @@ -49,7 +50,7 @@ public class S3SinkService { private Buffer currentBuffer; private final int maxEvents; private final ByteCount maxBytes; - private final long maxCollectionDuration; + private final Duration maxCollectionDuration; private final String bucket; private final int maxRetries; private final Counter objectsSucceededCounter; @@ -59,6 +60,7 @@ public class S3SinkService { private final DistributionSummary s3ObjectSizeSummary; private final OutputCodecContext codecContext; private final KeyGenerator keyGenerator; + private final Duration retrySleepTime; /** * @param s3SinkConfig s3 sink related configuration. @@ -68,20 +70,22 @@ public class S3SinkService { * @param pluginMetrics metrics. */ public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory bufferFactory, - final OutputCodec codec, final OutputCodecContext codecContext, final S3Client s3Client, final KeyGenerator keyGenerator, final PluginMetrics pluginMetrics) { + final OutputCodec codec, final OutputCodecContext codecContext, final S3Client s3Client, final KeyGenerator keyGenerator, + final Duration retrySleepTime, final PluginMetrics pluginMetrics) { this.s3SinkConfig = s3SinkConfig; this.bufferFactory = bufferFactory; this.codec = codec; this.s3Client = s3Client; this.codecContext = codecContext; this.keyGenerator = keyGenerator; + this.retrySleepTime = retrySleepTime; reentrantLock = new ReentrantLock(); bufferedEventHandles = new LinkedList<>(); maxEvents = s3SinkConfig.getThresholdOptions().getEventCount(); maxBytes = s3SinkConfig.getThresholdOptions().getMaximumSize(); - maxCollectionDuration = s3SinkConfig.getThresholdOptions().getEventCollectTimeOut().getSeconds(); + maxCollectionDuration = s3SinkConfig.getThresholdOptions().getEventCollectTimeOut(); bucket = s3SinkConfig.getBucketName(); maxRetries = s3SinkConfig.getMaxUploadRetries(); @@ -191,7 +195,7 @@ protected boolean retryFlushToS3(final Buffer currentBuffer, final String s3Key) } try { - Thread.sleep(5000); + Thread.sleep(retrySleepTime.toMillis()); } catch (final InterruptedException ex) { LOG.warn("Interrupted while backing off before retrying S3 upload", ex); } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ThresholdCheck.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ThresholdCheck.java index 3f1b648894..58bd9fa8dd 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ThresholdCheck.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ThresholdCheck.java @@ -8,6 +8,8 @@ import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer; +import java.time.Duration; + /** * Check threshold limits. */ @@ -24,13 +26,13 @@ private ThresholdCheck() { * @param maxCollectionDuration maximum event collection duration provided by user as threshold. * @return boolean value whether the threshold are met. */ - public static boolean checkThresholdExceed(final Buffer currentBuffer, final int maxEvents, final ByteCount maxBytes, final long maxCollectionDuration) { + public static boolean checkThresholdExceed(final Buffer currentBuffer, final int maxEvents, final ByteCount maxBytes, final Duration maxCollectionDuration) { if (maxEvents > 0) { return currentBuffer.getEventCount() + 1 > maxEvents || - currentBuffer.getDuration() > maxCollectionDuration || + currentBuffer.getDuration().compareTo(maxCollectionDuration) > 0 || currentBuffer.getSize() > maxBytes.getBytes(); } else { - return currentBuffer.getDuration() > maxCollectionDuration || + return currentBuffer.getDuration().compareTo(maxCollectionDuration) > 0 || currentBuffer.getSize() > maxBytes.getBytes(); } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/Buffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/Buffer.java index a56e124632..f6031cd3a4 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/Buffer.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/Buffer.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.sink.s3.accumulator; import java.io.OutputStream; +import java.time.Duration; /** * A buffer can hold data before flushing it to S3. @@ -18,7 +19,7 @@ public interface Buffer { long getSize(); int getEventCount(); - long getDuration(); + Duration getDuration(); void flushToS3(); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBuffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBuffer.java index 47203d69c2..a0db9d8d38 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBuffer.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBuffer.java @@ -9,6 +9,7 @@ import java.io.IOException; import java.io.OutputStream; +import java.time.Duration; import java.util.Objects; class CompressionBuffer implements Buffer { @@ -32,7 +33,7 @@ public int getEventCount() { } @Override - public long getDuration() { + public Duration getDuration() { return innerBuffer.getDuration(); } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java index bba1c9be8a..793ffbd23a 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java @@ -11,6 +11,7 @@ import software.amazon.awssdk.services.s3.model.PutObjectRequest; import java.io.ByteArrayOutputStream; import java.io.OutputStream; +import java.time.Duration; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -50,8 +51,8 @@ public int getEventCount() { return eventCount; } - public long getDuration() { - return watch.getTime(TimeUnit.SECONDS); + public Duration getDuration() { + return Duration.ofMillis(watch.getTime(TimeUnit.MILLISECONDS)); } /** diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java index 6fb0d6c043..aaff4c36ca 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java @@ -19,6 +19,7 @@ import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Paths; +import java.time.Duration; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -68,8 +69,8 @@ public int getEventCount() { } @Override - public long getDuration(){ - return watch.getTime(TimeUnit.SECONDS); + public Duration getDuration(){ + return Duration.ofMillis(watch.getTime(TimeUnit.MILLISECONDS)); } /** diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/MultipartBuffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/MultipartBuffer.java index e0fadd87f3..a76a3f926a 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/MultipartBuffer.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/MultipartBuffer.java @@ -9,6 +9,7 @@ import org.opensearch.dataprepper.plugins.codec.parquet.S3OutputStream; import java.io.IOException; +import java.time.Duration; import java.util.concurrent.TimeUnit; public class MultipartBuffer implements Buffer { @@ -40,8 +41,8 @@ public int getEventCount() { return eventCount; } - public long getDuration() { - return watch.getTime(TimeUnit.SECONDS); + public Duration getDuration() { + return Duration.ofMillis(watch.getTime(TimeUnit.MILLISECONDS)); } /** diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java index cb311d178a..37c33adb2a 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java @@ -133,7 +133,7 @@ void setUp() { } private S3SinkService createObjectUnderTest() { - return new S3SinkService(s3SinkConfig, bufferFactory, codec, codecContext, s3Client, keyGenerator, pluginMetrics); + return new S3SinkService(s3SinkConfig, bufferFactory, codec, codecContext, s3Client, keyGenerator, Duration.ofMillis(100), pluginMetrics); } @Test diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ThresholdCheckTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ThresholdCheckTest.java index 2c454fe568..17dafd9a1a 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ThresholdCheckTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ThresholdCheckTest.java @@ -7,130 +7,104 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer; -import org.opensearch.dataprepper.plugins.sink.s3.accumulator.InMemoryBufferFactory; -import java.io.IOException; -import java.io.OutputStream; +import java.time.Duration; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; +@ExtendWith(MockitoExtension.class) class ThresholdCheckTest { - private Buffer inMemoryBuffer; + @Mock(lenient = true) + private Buffer buffer; + private int maxEvents; + private ByteCount maxBytes; + private Duration maxCollectionDuration; @BeforeEach - void setUp() throws IOException { - inMemoryBuffer = new InMemoryBufferFactory().getBuffer(null, null, null); - - while (inMemoryBuffer.getEventCount() < 100) { - OutputStream outputStream = inMemoryBuffer.getOutputStream(); - outputStream.write(generateByteArray()); - int eventCount = inMemoryBuffer.getEventCount() +1; - inMemoryBuffer.setEventCount(eventCount); - } + void setUp() { + maxEvents = 10_000; + maxBytes = ByteCount.parse("48mb"); + maxCollectionDuration = Duration.ofMinutes(5); } @Test - void test_exceedThreshold_true_dueTo_maxEvents_is_less_than_buffered_event_count() { - final int maxEvents = 95; - final ByteCount maxBytes = ByteCount.parse("50kb"); - final long maxCollectionDuration = 15; - boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(inMemoryBuffer, maxEvents, + void test_exceedThreshold_true_dueTo_maxEvents_is_greater_than_buffered_event_count() { + when(buffer.getSize()).thenReturn(maxBytes.getBytes() - 1000); + when(buffer.getEventCount()).thenReturn(maxEvents + 1); + when(buffer.getDuration()).thenReturn(maxCollectionDuration.minusSeconds(1)); + + boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(buffer, maxEvents, maxBytes, maxCollectionDuration); + assertTrue(isThresholdExceed, "Threshold not exceeded"); } @Test - void test_exceedThreshold_false_dueTo_maxEvents_is_greater_than_buffered_event_count() { - final int maxEvents = 105; - final ByteCount maxBytes = ByteCount.parse("50mb"); - final long maxCollectionDuration = 50; - boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(inMemoryBuffer, maxEvents, maxBytes, - maxCollectionDuration); + void test_exceedThreshold_false_dueTo_maxEvents_is_less_than_buffered_event_count() { + when(buffer.getSize()).thenReturn(maxBytes.getBytes() - 1000); + when(buffer.getEventCount()).thenReturn(maxEvents - 1); + when(buffer.getDuration()).thenReturn(this.maxCollectionDuration.minusSeconds(1)); + + boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(buffer, maxEvents, + maxBytes, maxCollectionDuration); + assertFalse(isThresholdExceed, "Threshold exceeded"); } @Test - void test_exceedThreshold_ture_dueTo_maxBytes_is_less_than_buffered_byte_count() { - final int maxEvents = 500; - final ByteCount maxBytes = ByteCount.parse("1b"); - final long maxCollectionDuration = 15; - boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(inMemoryBuffer, maxEvents, maxBytes, - maxCollectionDuration); + void test_exceedThreshold_true_dueTo_maxBytes_is_greater_than_buffered_byte_count() { + when(buffer.getSize()).thenReturn(maxBytes.getBytes() + 1000); + when(buffer.getEventCount()).thenReturn(maxEvents - 1); + when(buffer.getDuration()).thenReturn(maxCollectionDuration.minusSeconds(1)); + + boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(buffer, maxEvents, + maxBytes, maxCollectionDuration); + assertTrue(isThresholdExceed, "Threshold not exceeded"); } @Test - void test_exceedThreshold_false_dueTo_maxBytes_is_greater_than_buffered_byte_count() { - final int maxEvents = 500; - final ByteCount maxBytes = ByteCount.parse("8mb"); - final long maxCollectionDuration = 15; - boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(inMemoryBuffer, maxEvents, + void test_exceedThreshold_false_dueTo_maxBytes_is_less_than_buffered_byte_count() { + when(buffer.getSize()).thenReturn(maxBytes.getBytes() - 1000); + when(buffer.getEventCount()).thenReturn(maxEvents - 1); + when(buffer.getDuration()).thenReturn(maxCollectionDuration.minusSeconds(1)); + + boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(buffer, maxEvents, maxBytes, maxCollectionDuration); + assertFalse(isThresholdExceed, "Threshold exceeded"); } @Test - void test_exceedThreshold_ture_dueTo_maxCollectionDuration_is_less_than_buffered_event_collection_duration() - throws IOException, InterruptedException { - final int maxEvents = 500; - final ByteCount maxBytes = ByteCount.parse("500mb"); - final long maxCollectionDuration = 10; - - inMemoryBuffer = new InMemoryBufferFactory().getBuffer(null, null, null); - boolean isThresholdExceed = Boolean.FALSE; - synchronized (this) { - while (inMemoryBuffer.getEventCount() < 100) { - OutputStream outputStream = inMemoryBuffer.getOutputStream(); - outputStream.write(generateByteArray()); - int eventCount = inMemoryBuffer.getEventCount() +1; - inMemoryBuffer.setEventCount(eventCount); - isThresholdExceed = ThresholdCheck.checkThresholdExceed(inMemoryBuffer, maxEvents, - maxBytes, maxCollectionDuration); - if (isThresholdExceed) { - break; - } - wait(5000); - } - } + void test_exceedThreshold_true_dueTo_maxCollectionDuration_is_greater_than_buffered_event_collection_duration() { + when(buffer.getSize()).thenReturn(maxBytes.getBytes() - 1000); + when(buffer.getEventCount()).thenReturn(maxEvents - 1); + when(buffer.getDuration()).thenReturn(maxCollectionDuration.plusSeconds(1)); + + boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(buffer, maxEvents, + maxBytes, maxCollectionDuration); + assertTrue(isThresholdExceed, "Threshold not exceeded"); } @Test - void test_exceedThreshold_ture_dueTo_maxCollectionDuration_is_greater_than_buffered_event_collection_duration() - throws IOException, InterruptedException { - final int maxEvents = 500; - final ByteCount maxBytes = ByteCount.parse("500mb"); - final long maxCollectionDuration = 240; - - inMemoryBuffer = new InMemoryBufferFactory().getBuffer(null,null, null); - - boolean isThresholdExceed = Boolean.FALSE; - synchronized (this) { - while (inMemoryBuffer.getEventCount() < 100) { - OutputStream outputStream = inMemoryBuffer.getOutputStream(); - outputStream.write(generateByteArray()); - int eventCount = inMemoryBuffer.getEventCount() +1; - inMemoryBuffer.setEventCount(eventCount); - isThresholdExceed = ThresholdCheck.checkThresholdExceed(inMemoryBuffer, - maxEvents, maxBytes, maxCollectionDuration); - if (isThresholdExceed) { - break; - } - wait(50); - } - } - assertFalse(isThresholdExceed, "Threshold exceeded"); - } + void test_exceedThreshold_false_dueTo_maxCollectionDuration_is_less_than_buffered_event_collection_duration() { + when(buffer.getSize()).thenReturn(maxBytes.getBytes() - 1000); + when(buffer.getEventCount()).thenReturn(maxEvents - 1); + when(buffer.getDuration()).thenReturn(maxCollectionDuration.minusSeconds(1)); - private byte[] generateByteArray() { - byte[] bytes = new byte[10000]; - for (int i = 0; i < 10000; i++) { - bytes[i] = (byte) i; - } - return bytes; + boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(buffer, maxEvents, + maxBytes, maxCollectionDuration); + + + assertFalse(isThresholdExceed, "Threshold exceeded"); } } diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferTest.java index 0195d1ae41..428223783a 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferTest.java @@ -15,6 +15,7 @@ import java.io.IOException; import java.io.OutputStream; +import java.time.Duration; import java.util.Random; import java.util.UUID; @@ -81,7 +82,7 @@ void getEventCount_returns_inner_getEventCount() { @Test void getDuration_returns_inner_getDuration() { - final long duration = random.nextInt(10_000) + 1_000; + final Duration duration = Duration.ofMillis(random.nextInt(10_000) + 1_000); final CompressionBuffer objectUnderTest = createObjectUnderTest(); when(innerBuffer.getDuration()).thenReturn(duration); diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferTest.java index 9a681b7354..3fd377a2a6 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferTest.java @@ -5,9 +5,6 @@ package org.opensearch.dataprepper.plugins.sink.s3.accumulator; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; - import org.hamcrest.Matchers; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -21,9 +18,16 @@ import java.io.IOException; import java.io.OutputStream; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.function.Supplier; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; @@ -54,8 +58,34 @@ void test_with_write_event_into_buffer() throws IOException { } assertThat(inMemoryBuffer.getSize(), greaterThanOrEqualTo(54110L)); assertThat(inMemoryBuffer.getEventCount(), equalTo(MAX_EVENTS)); - assertThat(inMemoryBuffer.getDuration(), greaterThanOrEqualTo(0L)); + assertThat(inMemoryBuffer.getDuration(), notNullValue()); + assertThat(inMemoryBuffer.getDuration(), greaterThanOrEqualTo(Duration.ZERO)); + + } + + @Test + void getDuration_provides_duration_within_expected_range() throws IOException, InterruptedException { + Instant startTime = Instant.now(); + inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier); + Instant endTime = Instant.now(); + + + while (inMemoryBuffer.getEventCount() < MAX_EVENTS) { + OutputStream outputStream = inMemoryBuffer.getOutputStream(); + outputStream.write(generateByteArray()); + int eventCount = inMemoryBuffer.getEventCount() +1; + inMemoryBuffer.setEventCount(eventCount); + } + Thread.sleep(100); + + Instant durationCheckpointTime = Instant.now(); + Duration duration = inMemoryBuffer.getDuration(); + assertThat(duration, notNullValue()); + Duration upperBoundDuration = Duration.between(startTime, durationCheckpointTime).truncatedTo(ChronoUnit.MILLIS); + Duration lowerBoundDuration = Duration.between(endTime, durationCheckpointTime).truncatedTo(ChronoUnit.MILLIS); + assertThat(duration, greaterThanOrEqualTo(lowerBoundDuration)); + assertThat(duration, lessThanOrEqualTo(upperBoundDuration)); } @Test diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferTest.java index 5dd1044d5c..b4f69d6196 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferTest.java @@ -11,9 +11,11 @@ import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.PutObjectRequest; + import java.io.File; import java.io.IOException; import java.io.OutputStream; +import java.time.Duration; import java.util.UUID; import java.util.function.Supplier; @@ -61,7 +63,8 @@ void test_with_write_events_into_buffer() throws IOException { } assertThat(localFileBuffer.getSize(), greaterThan(1l)); assertThat(localFileBuffer.getEventCount(), equalTo(55)); - assertThat(localFileBuffer.getDuration(), equalTo(0L)); + assertThat(localFileBuffer.getDuration(), notNullValue()); + assertThat(localFileBuffer.getDuration(), greaterThanOrEqualTo(Duration.ZERO)); localFileBuffer.flushAndCloseStream(); localFileBuffer.removeTemporaryFile(); assertFalse(tempFile.exists(), "The temp file has not been deleted."); @@ -71,7 +74,8 @@ void test_with_write_events_into_buffer() throws IOException { void test_without_write_events_into_buffer() { assertThat(localFileBuffer.getSize(), equalTo(0L)); assertThat(localFileBuffer.getEventCount(), equalTo(0)); - assertThat(localFileBuffer.getDuration(), equalTo(0L)); + assertThat(localFileBuffer.getDuration(), notNullValue()); + assertThat(localFileBuffer.getDuration(), greaterThanOrEqualTo(Duration.ZERO)); localFileBuffer.flushAndCloseStream(); localFileBuffer.removeTemporaryFile(); assertFalse(tempFile.exists(), "The temp file has not been deleted."); @@ -87,7 +91,8 @@ void test_with_write_events_into_buffer_and_flush_toS3() throws IOException { } assertThat(localFileBuffer.getSize(), greaterThan(1l)); assertThat(localFileBuffer.getEventCount(), equalTo(55)); - assertThat(localFileBuffer.getDuration(), greaterThanOrEqualTo(0L)); + assertThat(localFileBuffer.getDuration(), notNullValue()); + assertThat(localFileBuffer.getDuration(), greaterThanOrEqualTo(Duration.ZERO)); when(keySupplier.get()).thenReturn(KEY); when(bucketSupplier.get()).thenReturn(BUCKET_NAME);