Skip to content

Commit

Permalink
Fix batch abandonment on traffic stop
Browse files Browse the repository at this point in the history
Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Aug 17, 2023
1 parent b7661e6 commit e01f10b
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,6 @@ private void doInitializeInternal() {
*/
@Override
public void doOutput(final Collection<Record<Event>> records) {
if (records.isEmpty()) {
return;
}
s3SinkService.output(records);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,59 +91,48 @@ public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory buffer
numberOfRecordsSuccessCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_S3_SUCCESS);
numberOfRecordsFailedCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_S3_FAILED);
s3ObjectSizeSummary = pluginMetrics.summary(S3_OBJECTS_SIZE);

reentrantLock.lock();
if (currentBuffer == null) {
currentBuffer = bufferFactory.getBuffer();
}
reentrantLock.unlock();
}

/**
* @param records received records and add into buffer.
*/
void output(Collection<Record<Event>> records) {
reentrantLock.lock();
if (currentBuffer == null) {
currentBuffer = bufferFactory.getBuffer();
// Don't acquire the lock if there's no work to be done
if (records.isEmpty() && currentBuffer.getEventCount() == 0) {
return;
}
try {
OutputStream outputStream = currentBuffer.getOutputStream();

reentrantLock.lock();
try {
for (Record<Event> record : records) {

if (currentBuffer.getEventCount() == 0) {
final Event eventForSchemaAutoGenerate = record.getData();
codec.start(outputStream, eventForSchemaAutoGenerate, codecContext);
codec.start(currentBuffer.getOutputStream(), eventForSchemaAutoGenerate, codecContext);
}

final Event event = record.getData();
codec.writeEvent(event, outputStream);
codec.writeEvent(event, currentBuffer.getOutputStream());
int count = currentBuffer.getEventCount() + 1;
currentBuffer.setEventCount(count);

if (event.getEventHandle() != null) {
bufferedEventHandles.add(event.getEventHandle());
}
if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration)) {
codec.complete(outputStream);
final String s3Key = generateKey(codec);
LOG.info("Writing {} to S3 with {} events and size of {} bytes.",
s3Key, currentBuffer.getEventCount(), currentBuffer.getSize());
final boolean isFlushToS3 = retryFlushToS3(currentBuffer, s3Key);
if (isFlushToS3) {
LOG.info("Successfully saved {} to S3.", s3Key);
numberOfRecordsSuccessCounter.increment(currentBuffer.getEventCount());
objectsSucceededCounter.increment();
s3ObjectSizeSummary.record(currentBuffer.getSize());
releaseEventHandles(true);
} else {
LOG.error("Failed to save {} to S3.", s3Key);
numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount());
objectsFailedCounter.increment();
releaseEventHandles(false);
}
currentBuffer = bufferFactory.getBuffer();
outputStream = currentBuffer.getOutputStream();
}
flushToS3IfNeeded();
}
} catch (IOException | InterruptedException e) {
} catch (IOException e) {
LOG.error("Exception while write event into buffer :", e);
}

flushToS3IfNeeded();

reentrantLock.unlock();
}

Expand All @@ -155,15 +144,41 @@ private void releaseEventHandles(final boolean result) {
bufferedEventHandles.clear();
}

private void flushToS3IfNeeded() {
if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration)) {
try {
codec.complete(currentBuffer.getOutputStream());
final String s3Key = generateKey(codec);
LOG.info("Writing {} to S3 with {} events and size of {} bytes.",
s3Key, currentBuffer.getEventCount(), currentBuffer.getSize());
final boolean isFlushToS3 = retryFlushToS3(currentBuffer, s3Key);
if (isFlushToS3) {
LOG.info("Successfully saved {} to S3.", s3Key);
numberOfRecordsSuccessCounter.increment(currentBuffer.getEventCount());
objectsSucceededCounter.increment();
s3ObjectSizeSummary.record(currentBuffer.getSize());
releaseEventHandles(true);
} else {
LOG.error("Failed to save {} to S3.", s3Key);
numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount());
objectsFailedCounter.increment();
releaseEventHandles(false);
}
currentBuffer = bufferFactory.getBuffer();
} catch (final IOException e) {
LOG.error("Exception while completing codec", e);
}
}
}

/**
* perform retry in-case any issue occurred, based on max_upload_retries configuration.
*
* @param currentBuffer current buffer.
* @param s3Key
* @return boolean based on object upload status.
* @throws InterruptedException interruption during sleep.
*/
protected boolean retryFlushToS3(final Buffer currentBuffer, final String s3Key) throws InterruptedException {
protected boolean retryFlushToS3(final Buffer currentBuffer, final String s3Key) {
boolean isUploadedToS3 = Boolean.FALSE;
int retryCount = maxRetries;
do {
Expand All @@ -178,7 +193,12 @@ protected boolean retryFlushToS3(final Buffer currentBuffer, final String s3Key)
if (retryCount == 0) {
return isUploadedToS3;
}
Thread.sleep(5000);

try {
Thread.sleep(5000);
} catch (final InterruptedException ex) {
LOG.warn("Interrupted while backing off before retrying S3 upload", ex);
}
}
} while (!isUploadedToS3);
return isUploadedToS3;
Expand Down

0 comments on commit e01f10b

Please sign in to comment.