diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java index c880a72464..79e5969e55 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java @@ -110,9 +110,6 @@ private void doInitializeInternal() { */ @Override public void doOutput(final Collection> records) { - if (records.isEmpty()) { - return; - } s3SinkService.output(records); } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java index 68d504668d..99898755e4 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java @@ -91,59 +91,48 @@ public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory buffer numberOfRecordsSuccessCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_S3_SUCCESS); numberOfRecordsFailedCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_S3_FAILED); s3ObjectSizeSummary = pluginMetrics.summary(S3_OBJECTS_SIZE); + + reentrantLock.lock(); + if (currentBuffer == null) { + currentBuffer = bufferFactory.getBuffer(); + } + reentrantLock.unlock(); } /** * @param records received records and add into buffer. */ void output(Collection> records) { - reentrantLock.lock(); - if (currentBuffer == null) { - currentBuffer = bufferFactory.getBuffer(); + // Don't acquire the lock if there's no work to be done + if (records.isEmpty() && currentBuffer.getEventCount() == 0) { + return; } - try { - OutputStream outputStream = currentBuffer.getOutputStream(); + reentrantLock.lock(); + try { for (Record record : records) { if (currentBuffer.getEventCount() == 0) { final Event eventForSchemaAutoGenerate = record.getData(); - codec.start(outputStream, eventForSchemaAutoGenerate, codecContext); + codec.start(currentBuffer.getOutputStream(), eventForSchemaAutoGenerate, codecContext); } final Event event = record.getData(); - codec.writeEvent(event, outputStream); + codec.writeEvent(event, currentBuffer.getOutputStream()); int count = currentBuffer.getEventCount() + 1; currentBuffer.setEventCount(count); if (event.getEventHandle() != null) { bufferedEventHandles.add(event.getEventHandle()); } - if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration)) { - codec.complete(outputStream); - final String s3Key = generateKey(codec); - LOG.info("Writing {} to S3 with {} events and size of {} bytes.", - s3Key, currentBuffer.getEventCount(), currentBuffer.getSize()); - final boolean isFlushToS3 = retryFlushToS3(currentBuffer, s3Key); - if (isFlushToS3) { - LOG.info("Successfully saved {} to S3.", s3Key); - numberOfRecordsSuccessCounter.increment(currentBuffer.getEventCount()); - objectsSucceededCounter.increment(); - s3ObjectSizeSummary.record(currentBuffer.getSize()); - releaseEventHandles(true); - } else { - LOG.error("Failed to save {} to S3.", s3Key); - numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount()); - objectsFailedCounter.increment(); - releaseEventHandles(false); - } - currentBuffer = bufferFactory.getBuffer(); - outputStream = currentBuffer.getOutputStream(); - } + flushToS3IfNeeded(); } - } catch (IOException | InterruptedException e) { + } catch (IOException e) { LOG.error("Exception while write event into buffer :", e); } + + flushToS3IfNeeded(); + reentrantLock.unlock(); } @@ -155,15 +144,41 @@ private void releaseEventHandles(final boolean result) { bufferedEventHandles.clear(); } + private void flushToS3IfNeeded() { + if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration)) { + try { + codec.complete(currentBuffer.getOutputStream()); + final String s3Key = generateKey(codec); + LOG.info("Writing {} to S3 with {} events and size of {} bytes.", + s3Key, currentBuffer.getEventCount(), currentBuffer.getSize()); + final boolean isFlushToS3 = retryFlushToS3(currentBuffer, s3Key); + if (isFlushToS3) { + LOG.info("Successfully saved {} to S3.", s3Key); + numberOfRecordsSuccessCounter.increment(currentBuffer.getEventCount()); + objectsSucceededCounter.increment(); + s3ObjectSizeSummary.record(currentBuffer.getSize()); + releaseEventHandles(true); + } else { + LOG.error("Failed to save {} to S3.", s3Key); + numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount()); + objectsFailedCounter.increment(); + releaseEventHandles(false); + } + currentBuffer = bufferFactory.getBuffer(); + } catch (final IOException e) { + LOG.error("Exception while completing codec", e); + } + } + } + /** * perform retry in-case any issue occurred, based on max_upload_retries configuration. * * @param currentBuffer current buffer. * @param s3Key * @return boolean based on object upload status. - * @throws InterruptedException interruption during sleep. */ - protected boolean retryFlushToS3(final Buffer currentBuffer, final String s3Key) throws InterruptedException { + protected boolean retryFlushToS3(final Buffer currentBuffer, final String s3Key) { boolean isUploadedToS3 = Boolean.FALSE; int retryCount = maxRetries; do { @@ -178,7 +193,12 @@ protected boolean retryFlushToS3(final Buffer currentBuffer, final String s3Key) if (retryCount == 0) { return isUploadedToS3; } - Thread.sleep(5000); + + try { + Thread.sleep(5000); + } catch (final InterruptedException ex) { + LOG.warn("Interrupted while backing off before retrying S3 upload", ex); + } } } while (!isUploadedToS3); return isUploadedToS3;