Skip to content

Commit

Permalink
Updates to the S3 sink to speed up the unit test time. There are a fe…
Browse files Browse the repository at this point in the history
…w major changes - use the Duration class instead of a nebulous long to have millisecond options and clarity; inject the retry sleep time so that the tests can sleep for shorter time; using mocking where possible to avoid unnecessary sleeps. (#3203)

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored Aug 22, 2023
1 parent 5a02050 commit ffa8c4a
Show file tree
Hide file tree
Showing 14 changed files with 136 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ void verify_flushed_records_into_s3_bucketNewLine_with_compression() throws IOEx

private S3SinkService createObjectUnderTest() {
OutputCodecContext codecContext = new OutputCodecContext("Tag", Collections.emptyList(), Collections.emptyList());
return new S3SinkService(s3SinkConfig, bufferFactory, codec, codecContext, s3Client, keyGenerator, pluginMetrics);
return new S3SinkService(s3SinkConfig, bufferFactory, codec, codecContext, s3Client, keyGenerator, Duration.ofSeconds(5), pluginMetrics);
}

private int gets3ObjectCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3Client;

import java.time.Duration;
import java.util.Collection;

/**
Expand Down Expand Up @@ -75,7 +76,7 @@ public S3Sink(final PluginSetting pluginSetting,

S3OutputCodecContext s3OutputCodecContext = new S3OutputCodecContext(OutputCodecContext.fromSinkContext(sinkContext), compressionOption);

s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, s3OutputCodecContext, s3Client, keyGenerator, pluginMetrics);
s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, s3OutputCodecContext, s3Client, keyGenerator, Duration.ofSeconds(5), pluginMetrics);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import software.amazon.awssdk.services.s3.S3Client;

import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.LinkedList;
import java.util.concurrent.locks.Lock;
Expand All @@ -49,7 +50,7 @@ public class S3SinkService {
private Buffer currentBuffer;
private final int maxEvents;
private final ByteCount maxBytes;
private final long maxCollectionDuration;
private final Duration maxCollectionDuration;
private final String bucket;
private final int maxRetries;
private final Counter objectsSucceededCounter;
Expand All @@ -59,6 +60,7 @@ public class S3SinkService {
private final DistributionSummary s3ObjectSizeSummary;
private final OutputCodecContext codecContext;
private final KeyGenerator keyGenerator;
private final Duration retrySleepTime;

/**
* @param s3SinkConfig s3 sink related configuration.
Expand All @@ -68,20 +70,22 @@ public class S3SinkService {
* @param pluginMetrics metrics.
*/
public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory bufferFactory,
final OutputCodec codec, final OutputCodecContext codecContext, final S3Client s3Client, final KeyGenerator keyGenerator, final PluginMetrics pluginMetrics) {
final OutputCodec codec, final OutputCodecContext codecContext, final S3Client s3Client, final KeyGenerator keyGenerator,
final Duration retrySleepTime, final PluginMetrics pluginMetrics) {
this.s3SinkConfig = s3SinkConfig;
this.bufferFactory = bufferFactory;
this.codec = codec;
this.s3Client = s3Client;
this.codecContext = codecContext;
this.keyGenerator = keyGenerator;
this.retrySleepTime = retrySleepTime;
reentrantLock = new ReentrantLock();

bufferedEventHandles = new LinkedList<>();

maxEvents = s3SinkConfig.getThresholdOptions().getEventCount();
maxBytes = s3SinkConfig.getThresholdOptions().getMaximumSize();
maxCollectionDuration = s3SinkConfig.getThresholdOptions().getEventCollectTimeOut().getSeconds();
maxCollectionDuration = s3SinkConfig.getThresholdOptions().getEventCollectTimeOut();

bucket = s3SinkConfig.getBucketName();
maxRetries = s3SinkConfig.getMaxUploadRetries();
Expand Down Expand Up @@ -191,7 +195,7 @@ protected boolean retryFlushToS3(final Buffer currentBuffer, final String s3Key)
}

try {
Thread.sleep(5000);
Thread.sleep(retrySleepTime.toMillis());
} catch (final InterruptedException ex) {
LOG.warn("Interrupted while backing off before retrying S3 upload", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer;

import java.time.Duration;

/**
* Check threshold limits.
*/
Expand All @@ -24,13 +26,13 @@ private ThresholdCheck() {
* @param maxCollectionDuration maximum event collection duration provided by user as threshold.
* @return boolean value whether the threshold are met.
*/
public static boolean checkThresholdExceed(final Buffer currentBuffer, final int maxEvents, final ByteCount maxBytes, final long maxCollectionDuration) {
public static boolean checkThresholdExceed(final Buffer currentBuffer, final int maxEvents, final ByteCount maxBytes, final Duration maxCollectionDuration) {
if (maxEvents > 0) {
return currentBuffer.getEventCount() + 1 > maxEvents ||
currentBuffer.getDuration() > maxCollectionDuration ||
currentBuffer.getDuration().compareTo(maxCollectionDuration) > 0 ||
currentBuffer.getSize() > maxBytes.getBytes();
} else {
return currentBuffer.getDuration() > maxCollectionDuration ||
return currentBuffer.getDuration().compareTo(maxCollectionDuration) > 0 ||
currentBuffer.getSize() > maxBytes.getBytes();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.sink.s3.accumulator;

import java.io.OutputStream;
import java.time.Duration;

/**
* A buffer can hold data before flushing it to S3.
Expand All @@ -18,7 +19,7 @@ public interface Buffer {
long getSize();
int getEventCount();

long getDuration();
Duration getDuration();

void flushToS3();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import java.io.IOException;
import java.io.OutputStream;
import java.time.Duration;
import java.util.Objects;

class CompressionBuffer implements Buffer {
Expand All @@ -32,7 +33,7 @@ public int getEventCount() {
}

@Override
public long getDuration() {
public Duration getDuration() {
return innerBuffer.getDuration();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

Expand Down Expand Up @@ -50,8 +51,8 @@ public int getEventCount() {
return eventCount;
}

public long getDuration() {
return watch.getTime(TimeUnit.SECONDS);
public Duration getDuration() {
return Duration.ofMillis(watch.getTime(TimeUnit.MILLISECONDS));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

Expand Down Expand Up @@ -68,8 +69,8 @@ public int getEventCount() {
}

@Override
public long getDuration(){
return watch.getTime(TimeUnit.SECONDS);
public Duration getDuration(){
return Duration.ofMillis(watch.getTime(TimeUnit.MILLISECONDS));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.plugins.codec.parquet.S3OutputStream;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

public class MultipartBuffer implements Buffer {
Expand Down Expand Up @@ -40,8 +41,8 @@ public int getEventCount() {
return eventCount;
}

public long getDuration() {
return watch.getTime(TimeUnit.SECONDS);
public Duration getDuration() {
return Duration.ofMillis(watch.getTime(TimeUnit.MILLISECONDS));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ void setUp() {
}

private S3SinkService createObjectUnderTest() {
return new S3SinkService(s3SinkConfig, bufferFactory, codec, codecContext, s3Client, keyGenerator, pluginMetrics);
return new S3SinkService(s3SinkConfig, bufferFactory, codec, codecContext, s3Client, keyGenerator, Duration.ofMillis(100), pluginMetrics);
}

@Test
Expand Down
Loading

0 comments on commit ffa8c4a

Please sign in to comment.