Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to the S3 sink to speed up the unit test/build time #3203

Merged
merged 1 commit into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ void verify_flushed_records_into_s3_bucketNewLine_with_compression() throws IOEx

private S3SinkService createObjectUnderTest() {
OutputCodecContext codecContext = new OutputCodecContext("Tag", Collections.emptyList(), Collections.emptyList());
return new S3SinkService(s3SinkConfig, bufferFactory, codec, codecContext, s3Client, keyGenerator, pluginMetrics);
return new S3SinkService(s3SinkConfig, bufferFactory, codec, codecContext, s3Client, keyGenerator, Duration.ofSeconds(5), pluginMetrics);
}

private int gets3ObjectCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3Client;

import java.time.Duration;
import java.util.Collection;

/**
Expand Down Expand Up @@ -75,7 +76,7 @@ public S3Sink(final PluginSetting pluginSetting,

S3OutputCodecContext s3OutputCodecContext = new S3OutputCodecContext(OutputCodecContext.fromSinkContext(sinkContext), compressionOption);

s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, s3OutputCodecContext, s3Client, keyGenerator, pluginMetrics);
s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, s3OutputCodecContext, s3Client, keyGenerator, Duration.ofSeconds(5), pluginMetrics);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import software.amazon.awssdk.services.s3.S3Client;

import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.LinkedList;
import java.util.concurrent.locks.Lock;
Expand All @@ -49,7 +50,7 @@ public class S3SinkService {
private Buffer currentBuffer;
private final int maxEvents;
private final ByteCount maxBytes;
private final long maxCollectionDuration;
private final Duration maxCollectionDuration;
private final String bucket;
private final int maxRetries;
private final Counter objectsSucceededCounter;
Expand All @@ -59,6 +60,7 @@ public class S3SinkService {
private final DistributionSummary s3ObjectSizeSummary;
private final OutputCodecContext codecContext;
private final KeyGenerator keyGenerator;
private final Duration retrySleepTime;

/**
* @param s3SinkConfig s3 sink related configuration.
Expand All @@ -68,20 +70,22 @@ public class S3SinkService {
* @param pluginMetrics metrics.
*/
public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory bufferFactory,
final OutputCodec codec, final OutputCodecContext codecContext, final S3Client s3Client, final KeyGenerator keyGenerator, final PluginMetrics pluginMetrics) {
final OutputCodec codec, final OutputCodecContext codecContext, final S3Client s3Client, final KeyGenerator keyGenerator,
final Duration retrySleepTime, final PluginMetrics pluginMetrics) {
this.s3SinkConfig = s3SinkConfig;
this.bufferFactory = bufferFactory;
this.codec = codec;
this.s3Client = s3Client;
this.codecContext = codecContext;
this.keyGenerator = keyGenerator;
this.retrySleepTime = retrySleepTime;
reentrantLock = new ReentrantLock();

bufferedEventHandles = new LinkedList<>();

maxEvents = s3SinkConfig.getThresholdOptions().getEventCount();
maxBytes = s3SinkConfig.getThresholdOptions().getMaximumSize();
maxCollectionDuration = s3SinkConfig.getThresholdOptions().getEventCollectTimeOut().getSeconds();
maxCollectionDuration = s3SinkConfig.getThresholdOptions().getEventCollectTimeOut();

bucket = s3SinkConfig.getBucketName();
maxRetries = s3SinkConfig.getMaxUploadRetries();
Expand Down Expand Up @@ -191,7 +195,7 @@ protected boolean retryFlushToS3(final Buffer currentBuffer, final String s3Key)
}

try {
Thread.sleep(5000);
Thread.sleep(retrySleepTime.toMillis());
} catch (final InterruptedException ex) {
LOG.warn("Interrupted while backing off before retrying S3 upload", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer;

import java.time.Duration;

/**
* Check threshold limits.
*/
Expand All @@ -24,13 +26,13 @@ private ThresholdCheck() {
* @param maxCollectionDuration maximum event collection duration provided by user as threshold.
* @return boolean value whether the threshold are met.
*/
public static boolean checkThresholdExceed(final Buffer currentBuffer, final int maxEvents, final ByteCount maxBytes, final long maxCollectionDuration) {
public static boolean checkThresholdExceed(final Buffer currentBuffer, final int maxEvents, final ByteCount maxBytes, final Duration maxCollectionDuration) {
if (maxEvents > 0) {
return currentBuffer.getEventCount() + 1 > maxEvents ||
currentBuffer.getDuration() > maxCollectionDuration ||
currentBuffer.getDuration().compareTo(maxCollectionDuration) > 0 ||
currentBuffer.getSize() > maxBytes.getBytes();
} else {
return currentBuffer.getDuration() > maxCollectionDuration ||
return currentBuffer.getDuration().compareTo(maxCollectionDuration) > 0 ||
currentBuffer.getSize() > maxBytes.getBytes();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.sink.s3.accumulator;

import java.io.OutputStream;
import java.time.Duration;

/**
* A buffer can hold data before flushing it to S3.
Expand All @@ -18,7 +19,7 @@ public interface Buffer {
long getSize();
int getEventCount();

long getDuration();
Duration getDuration();

void flushToS3();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import java.io.IOException;
import java.io.OutputStream;
import java.time.Duration;
import java.util.Objects;

class CompressionBuffer implements Buffer {
Expand All @@ -32,7 +33,7 @@ public int getEventCount() {
}

@Override
public long getDuration() {
public Duration getDuration() {
return innerBuffer.getDuration();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

Expand Down Expand Up @@ -50,8 +51,8 @@ public int getEventCount() {
return eventCount;
}

public long getDuration() {
return watch.getTime(TimeUnit.SECONDS);
public Duration getDuration() {
return Duration.ofMillis(watch.getTime(TimeUnit.MILLISECONDS));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

Expand Down Expand Up @@ -68,8 +69,8 @@ public int getEventCount() {
}

@Override
public long getDuration(){
return watch.getTime(TimeUnit.SECONDS);
public Duration getDuration(){
return Duration.ofMillis(watch.getTime(TimeUnit.MILLISECONDS));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.plugins.codec.parquet.S3OutputStream;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

public class MultipartBuffer implements Buffer {
Expand Down Expand Up @@ -40,8 +41,8 @@ public int getEventCount() {
return eventCount;
}

public long getDuration() {
return watch.getTime(TimeUnit.SECONDS);
public Duration getDuration() {
return Duration.ofMillis(watch.getTime(TimeUnit.MILLISECONDS));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ void setUp() {
}

private S3SinkService createObjectUnderTest() {
return new S3SinkService(s3SinkConfig, bufferFactory, codec, codecContext, s3Client, keyGenerator, pluginMetrics);
return new S3SinkService(s3SinkConfig, bufferFactory, codec, codecContext, s3Client, keyGenerator, Duration.ofMillis(100), pluginMetrics);
}

@Test
Expand Down
Loading
Loading