Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into cloud-watch-metrics-confi…
Browse files Browse the repository at this point in the history
…guration
  • Loading branch information
venkataraopasyavula authored Aug 18, 2023
2 parents 1c0b305 + 6322389 commit 21b26c5
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 43 deletions.
73 changes: 73 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@ name: Release Artifacts

on:
workflow_dispatch:
inputs:
release-major-tag:
description: 'Whether to create major tag of docker image or not. This will create a tag such as 2.3 which points to this version.'
required: true
release-latest-tag:
description: >
'Whether to create latest tag of docker image or not. This will update the latest tag to point to this version. You should set this when releasing the latest version, but not patches to old versions.'
required: true

permissions:
id-token: write
Expand Down Expand Up @@ -95,3 +103,68 @@ jobs:

- name: Smoke Test Tarball Files
run: ./release/smoke-tests/run-tarball-files-smoke-tests.sh -v ${{ env.version }} -u ${{ secrets.ARCHIVES_PUBLIC_URL }} -n ${{ github.run_number }} -i ${{ matrix.image }} -t ${{ matrix.archive }}

protomote:
runs-on: ubuntu-latest
if: success() || failure()
needs: [build, validate-docker, validate-archive]
permissions:
contents: write
issues: write

steps:
- name: Checkout Data Prepper
uses: actions/checkout@v3
- name: Get Version
run: grep '^version=' gradle.properties >> $GITHUB_ENV

- name: Get Approvers
id: get_approvers
run: |
echo "approvers=$(cat .github/CODEOWNERS | grep @ | tr -d '* ' | sed 's/@/,/g' | sed 's/,//1')" >> $GITHUB_OUTPUT
- uses: trstringer/manual-approval@v1
with:
secret: ${{ github.TOKEN }}
approvers: ${{ steps.get_approvers.outputs.approvers }}
minimum-approvals: 2
issue-title: 'Release Data Prepper : ${{ env.version }}'
issue-body: >
Please approve or deny the release of Data Prepper.
**VERSION**: ${{ env.version }}
**BUILD NUMBER**: ${{ github.run_number }}
**RELEASE MAJOR TAG**: ${{ github.event.inputs.release-major-tag }}
**RELEASE LATEST TAG**: ${{ github.event.inputs.release-latest-tag }}
exclude-workflow-initiator-as-approver: false

- name: Create Release Description
run: |
echo 'version: ${{ env.version }}' > release-description.yaml
echo 'build_number: ${{ github.run_number }}' >> release-description.yaml
echo 'release_major_tag: ${{ github.event.inputs.release-major-tag }}' >> release-description.yaml
echo 'release_latest_tag: ${{ github.event.inputs.release-latest-tag }}' >> release-description.yaml
- name: Create tag
uses: actions/github-script@v6
with:
github-token: ${{ github.TOKEN }}
script: |
github.rest.git.createRef({
owner: context.repo.owner,
repo: context.repo.repo,
ref: 'refs/tags/${{ env.version }}',
sha: context.sha
})
- name: Draft release
uses: softprops/action-gh-release@v1
with:
draft: true
name: '${{ env.version }}'
tag_name: 'refs/tags/${{ env.version }}'
files: |
release-description.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,6 @@ private void doInitializeInternal() {
*/
@Override
public void doOutput(final Collection<Record<Event>> records) {
if (records.isEmpty()) {
return;
}
s3SinkService.output(records);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import software.amazon.awssdk.services.s3.S3Client;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.LinkedList;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -92,59 +91,44 @@ public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory buffer
numberOfRecordsSuccessCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_S3_SUCCESS);
numberOfRecordsFailedCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_S3_FAILED);
s3ObjectSizeSummary = pluginMetrics.summary(S3_OBJECTS_SIZE);

currentBuffer = bufferFactory.getBuffer(s3Client, () -> bucket, keyGenerator::generateKey);
}

/**
* @param records received records and add into buffer.
*/
void output(Collection<Record<Event>> records) {
reentrantLock.lock();
if (currentBuffer == null) {
currentBuffer = bufferFactory.getBuffer(s3Client, () -> bucket, keyGenerator::generateKey);
// Don't acquire the lock if there's no work to be done
if (records.isEmpty() && currentBuffer.getEventCount() == 0) {
return;
}
try {
OutputStream outputStream = currentBuffer.getOutputStream();

reentrantLock.lock();
try {
for (Record<Event> record : records) {

if (currentBuffer.getEventCount() == 0) {
final Event eventForSchemaAutoGenerate = record.getData();
codec.start(outputStream, eventForSchemaAutoGenerate, codecContext);
codec.start(currentBuffer.getOutputStream(), eventForSchemaAutoGenerate, codecContext);
}

final Event event = record.getData();
codec.writeEvent(event, outputStream);
codec.writeEvent(event, currentBuffer.getOutputStream());
int count = currentBuffer.getEventCount() + 1;
currentBuffer.setEventCount(count);

if (event.getEventHandle() != null) {
bufferedEventHandles.add(event.getEventHandle());
}
if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration)) {
codec.complete(outputStream);
String s3Key = currentBuffer.getKey();
LOG.info("Writing {} to S3 with {} events and size of {} bytes.",
s3Key, currentBuffer.getEventCount(), currentBuffer.getSize());
final boolean isFlushToS3 = retryFlushToS3(currentBuffer, s3Key);
if (isFlushToS3) {
LOG.info("Successfully saved {} to S3.", s3Key);
numberOfRecordsSuccessCounter.increment(currentBuffer.getEventCount());
objectsSucceededCounter.increment();
s3ObjectSizeSummary.record(currentBuffer.getSize());
releaseEventHandles(true);
} else {
LOG.error("Failed to save {} to S3.", s3Key);
numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount());
objectsFailedCounter.increment();
releaseEventHandles(false);
}
currentBuffer = bufferFactory.getBuffer(s3Client, () -> bucket, keyGenerator::generateKey);
outputStream = currentBuffer.getOutputStream();
}
flushToS3IfNeeded();
}
} catch (IOException | InterruptedException e) {
} catch (IOException e) {
LOG.error("Exception while write event into buffer :", e);
}

flushToS3IfNeeded();

reentrantLock.unlock();
}

Expand All @@ -156,15 +140,41 @@ private void releaseEventHandles(final boolean result) {
bufferedEventHandles.clear();
}

private void flushToS3IfNeeded() {
if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration)) {
try {
codec.complete(currentBuffer.getOutputStream());
String s3Key = currentBuffer.getKey();
LOG.info("Writing {} to S3 with {} events and size of {} bytes.",
s3Key, currentBuffer.getEventCount(), currentBuffer.getSize());
final boolean isFlushToS3 = retryFlushToS3(currentBuffer, s3Key);
if (isFlushToS3) {
LOG.info("Successfully saved {} to S3.", s3Key);
numberOfRecordsSuccessCounter.increment(currentBuffer.getEventCount());
objectsSucceededCounter.increment();
s3ObjectSizeSummary.record(currentBuffer.getSize());
releaseEventHandles(true);
} else {
LOG.error("Failed to save {} to S3.", s3Key);
numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount());
objectsFailedCounter.increment();
releaseEventHandles(false);
}
currentBuffer = bufferFactory.getBuffer(s3Client, () -> bucket, keyGenerator::generateKey);
} catch (final IOException e) {
LOG.error("Exception while completing codec", e);
}
}
}

/**
* perform retry in-case any issue occurred, based on max_upload_retries configuration.
*
* @param currentBuffer current buffer.
* @param s3Key
* @return boolean based on object upload status.
* @throws InterruptedException interruption during sleep.
*/
protected boolean retryFlushToS3(final Buffer currentBuffer, final String s3Key) throws InterruptedException {
protected boolean retryFlushToS3(final Buffer currentBuffer, final String s3Key) {
boolean isUploadedToS3 = Boolean.FALSE;
int retryCount = maxRetries;
do {
Expand All @@ -174,12 +184,17 @@ protected boolean retryFlushToS3(final Buffer currentBuffer, final String s3Key)
} catch (AwsServiceException | SdkClientException e) {
LOG.error("Exception occurred while uploading records to s3 bucket. Retry countdown : {} | exception:",
retryCount, e);
LOG.info("Error Massage {}", e.getMessage());
LOG.info("Error Message {}", e.getMessage());
--retryCount;
if (retryCount == 0) {
return isUploadedToS3;
}
Thread.sleep(5000);

try {
Thread.sleep(5000);
} catch (final InterruptedException ex) {
LOG.warn("Interrupted while backing off before retrying S3 upload", ex);
}
}
} while (!isUploadedToS3);
return isUploadedToS3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ void test_output_with_threshold_set_as_more_then_zero_event_count() throws IOExc
S3SinkService s3SinkService = createObjectUnderTest();
assertNotNull(s3SinkService);
s3SinkService.output(generateRandomStringEventRecord());
verify(snapshotSuccessCounter, times(50)).increment();
verify(snapshotSuccessCounter, times(51)).increment();
}


Expand All @@ -181,7 +181,7 @@ void test_output_with_threshold_set_as_zero_event_count() throws IOException {
S3SinkService s3SinkService = createObjectUnderTest();
assertNotNull(s3SinkService);
s3SinkService.output(generateRandomStringEventRecord());
verify(snapshotSuccessCounter, times(50)).increment();
verify(snapshotSuccessCounter, times(51)).increment();
}

@Test
Expand All @@ -200,7 +200,7 @@ void test_output_with_uploadedToS3_success() throws IOException {
assertNotNull(s3SinkService);
assertThat(s3SinkService, instanceOf(S3SinkService.class));
s3SinkService.output(generateRandomStringEventRecord());
verify(snapshotSuccessCounter, times(50)).increment();
verify(snapshotSuccessCounter, times(51)).increment();
}

@Test
Expand All @@ -219,7 +219,7 @@ void test_output_with_uploadedToS3_success_records_byte_count() throws IOExcepti
final S3SinkService s3SinkService = createObjectUnderTest();
s3SinkService.output(generateRandomStringEventRecord());

verify(s3ObjectSizeSummary, times(50)).record(objectSize);
verify(s3ObjectSizeSummary, times(51)).record(objectSize);
}

@Test
Expand All @@ -245,7 +245,7 @@ void test_output_with_uploadedToS3_midBatch_generatesNewOutputStream() throws IO

s3SinkService.output(generateEventRecords(2));

verify(snapshotSuccessCounter, times(2)).increment();
verify(snapshotSuccessCounter, times(3)).increment();
verify(codec).writeEvent(any(), eq(outputStream1));
verify(codec).writeEvent(any(), eq(outputStream2));
}
Expand Down Expand Up @@ -283,7 +283,45 @@ void test_output_with_uploadedToS3_failure_does_not_record_byte_count() throws I
s3SinkService.output(Collections.singletonList(new Record<>(event)));

verify(s3ObjectSizeSummary, never()).record(anyLong());
verify(buffer, times(3)).flushToS3();
verify(buffer, times(6)).flushToS3();
}

@Test
void test_output_with_no_incoming_records_flushes_batch() throws IOException {

bufferFactory = mock(BufferFactory.class);
Buffer buffer = mock(Buffer.class);
when(bufferFactory.getBuffer(any(S3Client.class), any(), any())).thenReturn(buffer);
when(buffer.getEventCount()).thenReturn(10);

final OutputStream outputStream = mock(OutputStream.class);
final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString());
doNothing().when(codec).writeEvent(event, outputStream);
final S3SinkService s3SinkService = createObjectUnderTest();
s3SinkService.output(Collections.emptyList());

verify(snapshotSuccessCounter, times(1)).increment();
verify(buffer, times(1)).flushToS3();
}

@Test
void test_output_with_no_incoming_records_or_buffered_records_short_circuits() throws IOException {

bufferFactory = mock(BufferFactory.class);
Buffer buffer = mock(Buffer.class);
when(bufferFactory.getBuffer(any(S3Client.class), any(), any())).thenReturn(buffer);
when(buffer.getEventCount()).thenReturn(0);
final long objectSize = random.nextInt(1_000_000) + 10_000;
when(buffer.getSize()).thenReturn(objectSize);

final OutputStream outputStream = mock(OutputStream.class);
final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString());
doNothing().when(codec).writeEvent(event, outputStream);
final S3SinkService s3SinkService = createObjectUnderTest();
s3SinkService.output(Collections.emptyList());

verify(snapshotSuccessCounter, times(0)).increment();
verify(buffer, times(0)).flushToS3();
}

@Test
Expand Down

0 comments on commit 21b26c5

Please sign in to comment.