Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 scan enhancements #3049

Merged

Conversation

asifsmohammed
Copy link
Collaborator

@asifsmohammed asifsmohammed commented Jul 20, 2023

Description

  • Adds scheduling configuration to S3 scan
  • Adds delete_s3_objects option to delete S3 object after processing. This adds S3ObjectDeleteWorker class which will be later used by SQS and S3 select to delete S3 objects.
  • renews source coordinator ownership after every 10,000 records by calling saveProgressStateForPartition
  • Updated documentation with new metrics and options
  • Updated Integration tests

Issues Resolved

#2596 partially resolved

Check List

  • New functionality includes testing.
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

kkondaka
kkondaka previously approved these changes Jul 21, 2023
@@ -31,6 +43,10 @@ public class ScanObjectWorker implements Runnable{

private static final int STANDARD_BACKOFF_MILLIS = 30_000;

// Keeping this same as source coordinator ownership time
private static final int ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS = 10_000;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10000 seconds? That's almost 3 hours. Is this right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually planning to increase it, as we don't have a way to decide how long an object takes to process. Ownership will be reset by saving state after every 10000 records written to buffer, where the default is 10000 seconds. If we are renewing the ownership to be more than 10000 seconds the acknowledgements should be more as well. I don't know if using Integer.MAX_VALUE is right.

Any thoughts or ideas of what it should be?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the ideally acknowledgments would give a negative ack to indicate the source to give up waiting on the acknowledgment set. When there is an unknown failure, this would mean waiting longer before timing out, but we should also fix unknown failures to become known failures that give negative ack. It is extremely difficult to estimate how long it will take an object to reach the sink even with knowing the object size, since the sink may have backpressure. Having a small value makes it much more likely that the timeout will occur due to sink backpressure.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what the right value would be. Was just asking if you intended it to be 3 hours or so.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I and Krishna looked in to existing sinks, we think that we can have max value for ack timeout. We also identified that returning the callback as soon as it received a false ack will help. It's not a big change but we need to evaluate it further to see what the correct behavior should be.

kkondaka
kkondaka previously approved these changes Jul 25, 2023
kkondaka
kkondaka previously approved these changes Jul 25, 2023
@@ -117,6 +117,8 @@ All Duration values are a string that represents a duration. They support ISO_86

* `disable_bucket_ownership_validation` (Optional) : Boolean - If set to true, then the S3 Source will not attempt to validate that the bucket is owned by the expected account. The only expected account is the same account which owns the SQS queue. Defaults to `false`.

* `delete_s3_objects` (Optional) : Boolean - If set to true, then the S3 Source will attempt to delete S3 objects after processing. If `acknowledgments` is enabled, S3 objects will be delete only if positive acknowledgment is received by S3 source. Defaults to `false`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: S3 Objects will be deleted

a `rate` of `PT1H` and a `job_count` of 3 would result in each object getting processed 3 times, starting after source is ready
and then every hour after the first time the object is processed.

* `rate` (Optional) : A String that indicates the rate to process an index based on the `job_count`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy paste error object instead of index

* `rate` (Optional) : A String that indicates the rate to process an index based on the `job_count`.
Supports ISO_8601 notation Strings ("PT20.345S", "PT15M", etc.) as well as simple notation Strings for seconds ("60s") and milliseconds ("1500ms").
Defaults to 8 hours, and is only applicable when `job_count` is greater than 1.
* `job_count` (Optional) : An Integer that specifies how many times each index should be processed. Defaults to 1.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. index -> object

@@ -204,6 +221,9 @@ The following policy shows the necessary permissions for S3 source. `kms:Decrypt
* `sqsMessagesDeleted` - The number of SQS messages deleted from the queue by the S3 Source.
* `sqsMessagesFailed` - The number of SQS messages that the S3 Source failed to parse.
* `sqsMessagesDeleteFailed` - The number of SQS messages that the S3 Source failed to delete from the SQS queue.
* `s3ObjectsDeleted` - The number of S3 objects deleted by the S3 source.
* `s3ObjectsDeleteFailed` - The number of S3 objects that the S3 source failed to delete.
* `acknowledgementSetCallbackCounter` - The number of times End-to-end acknowledgments created an acknowledgment set.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this useful for?

Copy link
Collaborator Author

@asifsmohammed asifsmohammed Jul 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This metric is already part of acknowledgements in SqsWorker class, and I also added the same metric in ScanObjectWorker for S3 scan acknowledgements. Basically this is just the missing documentation and also the same metric in S3 scan.

Not exactly sure what is this metric useful for, but can be used to track total number of acknowledgments.

Comment on lines 115 to 121
// s3Client = S3Client.builder()
// .region(Region.of(System.getProperty("tests.s3source.region")))
// .build();
// s3AsyncClient = S3AsyncClient.builder()
// .region(Region.of(System.getProperty("tests.s3source.region")))
// .build();
// bucket = System.getProperty("tests.s3source.bucket");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should remove these comments if unneeded

public void parseS3Object(final S3ObjectReference s3ObjectReference,
final AcknowledgementSet acknowledgementSet,
final SourceCoordinator<S3SourceProgressState> sourceCoordinator,
final String partitionKey) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitionKey will always be bucketName|objectKey, which is already in the S3ObjectReference. I am ok with abstracting them though

int recordsWrittenAfterLastSaveState = bufferAccumulator.getTotalWritten() - saveStateCounter.get() * RECORDS_TO_ACCUMULATE_TO_SAVE_STATE;
// Saving state to renew source coordination ownership for every 10,000 records, ownership time is 10 minutes
if (recordsWrittenAfterLastSaveState >= RECORDS_TO_ACCUMULATE_TO_SAVE_STATE && sourceCoordinator != null && partitionKey != null) {
sourceCoordinator.saveProgressStateForPartition(partitionKey, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we not want a custom ownership timeout here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would increase the ownership for additional 10 minutes right? Ideally we want custom timeout but how do we come up with that value?

@@ -31,6 +43,10 @@ public class ScanObjectWorker implements Runnable{

private static final int STANDARD_BACKOFF_MILLIS = 30_000;

// Keeping this same as source coordinator ownership time
private static final int ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS = 10_000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the ideally acknowledgments would give a negative ack to indicate the source to give up waiting on the acknowledgment set. When there is an unknown failure, this would mean waiting longer before timing out, but we should also fix unknown failures to become known failures that give negative ack. It is extremely difficult to estimate how long it will take an object to reach the sink even with knowing the object size, since the sink may have backpressure. Having a small value makes it much more likely that the timeout will occur due to sink backpressure.

acknowledgementSet = acknowledgementSetManager.create((result) -> {
acknowledgementSetCallbackCounter.increment();
// Delete only if this is positive acknowledgement
if (result == true) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is (result == false) a negative ack then?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but it's not useful as the callback is already called and it won't timeout. We just don't close the partition or delete S3 object in case of false.

Signed-off-by: Asif Sohail Mohammed <[email protected]>
Signed-off-by: Asif Sohail Mohammed <[email protected]>
Signed-off-by: Asif Sohail Mohammed <[email protected]>
Signed-off-by: Asif Sohail Mohammed <[email protected]>
Signed-off-by: Asif Sohail Mohammed <[email protected]>
Signed-off-by: Asif Sohail Mohammed <[email protected]>
@asifsmohammed asifsmohammed merged commit 5262eea into opensearch-project:main Aug 7, 2023
23 of 24 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants