diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 99cee19ed1e78..fd5b2e5be2a3a 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -51,6 +51,7 @@ #include #include #include +#include #include #include #include @@ -74,6 +75,7 @@ #include #include #include +#include #include // AWS_SDK_VERSION_{MAJOR,MINOR,PATCH} are available since 1.9.7. @@ -1335,7 +1337,7 @@ struct ObjectMetadataSetter { static std::unordered_map GetSetters() { return {{"ACL", CannedACLSetter()}, {"Cache-Control", StringSetter(&ObjectRequest::SetCacheControl)}, - {"Content-Type", StringSetter(&ObjectRequest::SetContentType)}, + {"Content-Type", ContentTypeSetter()}, {"Content-Language", StringSetter(&ObjectRequest::SetContentLanguage)}, {"Expires", DateTimeSetter(&ObjectRequest::SetExpires)}}; } @@ -1365,6 +1367,16 @@ struct ObjectMetadataSetter { }; } + /** We need a special setter here and can not use `StringSetter` because for e.g. the + * `PutObjectRequest`, the setter is located in the base class (instead of the concrete + * class). */ + static Setter ContentTypeSetter() { + return [](const std::string& str, ObjectRequest* req) { + req->SetContentType(str); + return Status::OK(); + }; + } + static Result ParseACL(const std::string& v) { if (v.empty()) { return S3Model::ObjectCannedACL::NOT_SET; @@ -1583,6 +1595,15 @@ class ObjectInputFile final : public io::RandomAccessFile { // (for rational, see: https://github.com/apache/arrow/issues/34363) static constexpr int64_t kPartUploadSize = 10 * 1024 * 1024; +// Above this threshold, use a multi-part upload instead of a single request upload. Only +// relevant if early sanitization of writing to the bucket is disabled (see +// `allow_delayed_open`). +static constexpr int64_t kMultiPartUploadThresholdSize = kPartUploadSize - 1; + +static_assert(kMultiPartUploadThresholdSize < kPartUploadSize, + "Multi part upload threshold size must be stricly less than the actual " + "multi part upload part size."); + // An OutputStream that writes to a S3 object class ObjectOutputStream final : public io::OutputStream { protected: @@ -1598,7 +1619,8 @@ class ObjectOutputStream final : public io::OutputStream { path_(path), metadata_(metadata), default_metadata_(options.default_metadata), - background_writes_(options.background_writes) {} + background_writes_(options.background_writes), + allow_delayed_open_(options.allow_delayed_open) {} ~ObjectOutputStream() override { // For compliance with the rest of the IO stack, Close rather than Abort, @@ -1606,29 +1628,47 @@ class ObjectOutputStream final : public io::OutputStream { io::internal::CloseFromDestructor(this); } + template + Status SetMetadataInRequest(ObjectRequest* request) { + std::shared_ptr metadata; + + if (metadata_ && metadata_->size() != 0) { + metadata = metadata_; + } else if (default_metadata_ && default_metadata_->size() != 0) { + metadata = default_metadata_; + } + + bool is_content_type_set{false}; + if (metadata) { + RETURN_NOT_OK(SetObjectMetadata(metadata, request)); + + is_content_type_set = metadata->Contains("Content-Type"); + } + + if (!is_content_type_set) { + // If we do not set anything then the SDK will default to application/xml + // which confuses some tools (https://github.com/apache/arrow/issues/11934) + // So we instead default to application/octet-stream which is less misleading + request->SetContentType("application/octet-stream"); + } + + return Status::OK(); + } + std::shared_ptr Self() { return std::dynamic_pointer_cast(shared_from_this()); } - Status Init() { + Status CreateMultipartUpload() { + DCHECK(ShouldBeMultipartUpload()); + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); // Initiate the multi-part upload S3Model::CreateMultipartUploadRequest req; req.SetBucket(ToAwsString(path_.bucket)); req.SetKey(ToAwsString(path_.key)); - if (metadata_ && metadata_->size() != 0) { - RETURN_NOT_OK(SetObjectMetadata(metadata_, &req)); - } else if (default_metadata_ && default_metadata_->size() != 0) { - RETURN_NOT_OK(SetObjectMetadata(default_metadata_, &req)); - } - - // If we do not set anything then the SDK will default to application/xml - // which confuses some tools (https://github.com/apache/arrow/issues/11934) - // So we instead default to application/octet-stream which is less misleading - if (!req.ContentTypeHasBeenSet()) { - req.SetContentType("application/octet-stream"); - } + RETURN_NOT_OK(SetMetadataInRequest(&req)); auto outcome = client_lock.Move()->CreateMultipartUpload(req); if (!outcome.IsSuccess()) { @@ -1637,7 +1677,19 @@ class ObjectOutputStream final : public io::OutputStream { path_.key, "' in bucket '", path_.bucket, "': "), "CreateMultipartUpload", outcome.GetError()); } - upload_id_ = outcome.GetResult().GetUploadId(); + multipart_upload_id_ = outcome.GetResult().GetUploadId(); + + return Status::OK(); + } + + Status Init() { + // If we are allowed to do delayed I/O, we can use a single request to upload the + // data. If not, we use a multi-part upload and initiate it here to + // sanitize that writing to the bucket is possible. + if (!allow_delayed_open_) { + RETURN_NOT_OK(CreateMultipartUpload()); + } + upload_state_ = std::make_shared(); closed_ = false; return Status::OK(); @@ -1648,42 +1700,62 @@ class ObjectOutputStream final : public io::OutputStream { return Status::OK(); } - ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + if (IsMultipartCreated()) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); - S3Model::AbortMultipartUploadRequest req; - req.SetBucket(ToAwsString(path_.bucket)); - req.SetKey(ToAwsString(path_.key)); - req.SetUploadId(upload_id_); + S3Model::AbortMultipartUploadRequest req; + req.SetBucket(ToAwsString(path_.bucket)); + req.SetKey(ToAwsString(path_.key)); + req.SetUploadId(multipart_upload_id_); - auto outcome = client_lock.Move()->AbortMultipartUpload(req); - if (!outcome.IsSuccess()) { - return ErrorToStatus( - std::forward_as_tuple("When aborting multiple part upload for key '", path_.key, - "' in bucket '", path_.bucket, "': "), - "AbortMultipartUpload", outcome.GetError()); + auto outcome = client_lock.Move()->AbortMultipartUpload(req); + if (!outcome.IsSuccess()) { + return ErrorToStatus( + std::forward_as_tuple("When aborting multiple part upload for key '", + path_.key, "' in bucket '", path_.bucket, "': "), + "AbortMultipartUpload", outcome.GetError()); + } } + current_part_.reset(); holder_ = nullptr; closed_ = true; + return Status::OK(); } // OutputStream interface + bool ShouldBeMultipartUpload() const { + return pos_ > kMultiPartUploadThresholdSize || !allow_delayed_open_; + } + + bool IsMultipartCreated() const { return !multipart_upload_id_.empty(); } + Status EnsureReadyToFlushFromClose() { - if (current_part_) { - // Upload last part - RETURN_NOT_OK(CommitCurrentPart()); - } + if (ShouldBeMultipartUpload()) { + if (current_part_) { + // Upload last part + RETURN_NOT_OK(CommitCurrentPart()); + } - // S3 mandates at least one part, upload an empty one if necessary - if (part_number_ == 1) { - RETURN_NOT_OK(UploadPart("", 0)); + // S3 mandates at least one part, upload an empty one if necessary + if (part_number_ == 1) { + RETURN_NOT_OK(UploadPart("", 0)); + } + } else { + RETURN_NOT_OK(UploadUsingSingleRequest()); } return Status::OK(); } + Status CleanupAfterClose() { + holder_ = nullptr; + closed_ = true; + return Status::OK(); + } + Status FinishPartUploadAfterFlush() { ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); @@ -1697,7 +1769,7 @@ class ObjectOutputStream final : public io::OutputStream { S3Model::CompleteMultipartUploadRequest req; req.SetBucket(ToAwsString(path_.bucket)); req.SetKey(ToAwsString(path_.key)); - req.SetUploadId(upload_id_); + req.SetUploadId(multipart_upload_id_); req.SetMultipartUpload(std::move(completed_upload)); auto outcome = @@ -1709,8 +1781,6 @@ class ObjectOutputStream final : public io::OutputStream { "CompleteMultipartUpload", outcome.GetError()); } - holder_ = nullptr; - closed_ = true; return Status::OK(); } @@ -1720,7 +1790,12 @@ class ObjectOutputStream final : public io::OutputStream { RETURN_NOT_OK(EnsureReadyToFlushFromClose()); RETURN_NOT_OK(Flush()); - return FinishPartUploadAfterFlush(); + + if (IsMultipartCreated()) { + RETURN_NOT_OK(FinishPartUploadAfterFlush()); + } + + return CleanupAfterClose(); } Future<> CloseAsync() override { @@ -1729,8 +1804,12 @@ class ObjectOutputStream final : public io::OutputStream { RETURN_NOT_OK(EnsureReadyToFlushFromClose()); // Wait for in-progress uploads to finish (if async writes are enabled) - return FlushAsync().Then( - [self = Self()]() { return self->FinishPartUploadAfterFlush(); }); + return FlushAsync().Then([self = Self()]() { + if (self->IsMultipartCreated()) { + RETURN_NOT_OK(self->FinishPartUploadAfterFlush()); + } + return self->CleanupAfterClose(); + }); } bool closed() const override { return closed_; } @@ -1776,7 +1855,8 @@ class ObjectOutputStream final : public io::OutputStream { return Status::OK(); } - // Upload current buffer + // Upload current buffer. We're only reaching this point if we have accumulated + // enough data to upload. RETURN_NOT_OK(CommitCurrentPart()); } @@ -1810,40 +1890,73 @@ class ObjectOutputStream final : public io::OutputStream { } // Wait for background writes to finish std::unique_lock lock(upload_state_->mutex); - return upload_state_->pending_parts_completed; + return upload_state_->pending_uploads_completed; } // Upload-related helpers Status CommitCurrentPart() { + if (!IsMultipartCreated()) { + RETURN_NOT_OK(CreateMultipartUpload()); + } + ARROW_ASSIGN_OR_RAISE(auto buf, current_part_->Finish()); current_part_.reset(); current_part_size_ = 0; return UploadPart(buf); } - Status UploadPart(std::shared_ptr buffer) { - return UploadPart(buffer->data(), buffer->size(), buffer); + Status UploadUsingSingleRequest() { + std::shared_ptr buf; + if (current_part_ == nullptr) { + // In case the stream is closed directly after it has been opened without writing + // anything, we'll have to create an empty buffer. + buf = std::make_shared(""); + } else { + ARROW_ASSIGN_OR_RAISE(buf, current_part_->Finish()); + } + + current_part_.reset(); + current_part_size_ = 0; + return UploadUsingSingleRequest(buf); } - Status UploadPart(const void* data, int64_t nbytes, - std::shared_ptr owned_buffer = nullptr) { - S3Model::UploadPartRequest req; + template + using UploadResultCallbackFunction = + std::function, + int32_t part_number, OutcomeType outcome)>; + + static Result TriggerUploadRequest( + const Aws::S3::Model::PutObjectRequest& request, + const std::shared_ptr& holder) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder->Lock()); + return client_lock.Move()->PutObject(request); + } + + static Result TriggerUploadRequest( + const Aws::S3::Model::UploadPartRequest& request, + const std::shared_ptr& holder) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder->Lock()); + return client_lock.Move()->UploadPart(request); + } + + template + Status Upload( + RequestType&& req, + UploadResultCallbackFunction sync_result_callback, + UploadResultCallbackFunction async_result_callback, + const void* data, int64_t nbytes, std::shared_ptr owned_buffer = nullptr) { req.SetBucket(ToAwsString(path_.bucket)); req.SetKey(ToAwsString(path_.key)); - req.SetUploadId(upload_id_); - req.SetPartNumber(part_number_); + req.SetBody(std::make_shared(data, nbytes)); req.SetContentLength(nbytes); if (!background_writes_) { req.SetBody(std::make_shared(data, nbytes)); - ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); - auto outcome = client_lock.Move()->UploadPart(req); - if (!outcome.IsSuccess()) { - return UploadPartError(req, outcome); - } else { - AddCompletedPart(upload_state_, part_number_, outcome.GetResult()); - } + + ARROW_ASSIGN_OR_RAISE(auto outcome, TriggerUploadRequest(req, holder_)); + + RETURN_NOT_OK(sync_result_callback(req, upload_state_, part_number_, outcome)); } else { // If the data isn't owned, make an immutable copy for the lifetime of the closure if (owned_buffer == nullptr) { @@ -1858,19 +1971,18 @@ class ObjectOutputStream final : public io::OutputStream { { std::unique_lock lock(upload_state_->mutex); - if (upload_state_->parts_in_progress++ == 0) { - upload_state_->pending_parts_completed = Future<>::Make(); + if (upload_state_->uploads_in_progress++ == 0) { + upload_state_->pending_uploads_completed = Future<>::Make(); } } // The closure keeps the buffer and the upload state alive auto deferred = [owned_buffer, holder = holder_, req = std::move(req), - state = upload_state_, + state = upload_state_, async_result_callback, part_number = part_number_]() mutable -> Status { - ARROW_ASSIGN_OR_RAISE(auto client_lock, holder->Lock()); - auto outcome = client_lock.Move()->UploadPart(req); - HandleUploadOutcome(state, part_number, req, outcome); - return Status::OK(); + ARROW_ASSIGN_OR_RAISE(auto outcome, TriggerUploadRequest(req, holder)); + + return async_result_callback(req, state, part_number, outcome); }; RETURN_NOT_OK(SubmitIO(io_context_, std::move(deferred))); } @@ -1880,9 +1992,118 @@ class ObjectOutputStream final : public io::OutputStream { return Status::OK(); } - static void HandleUploadOutcome(const std::shared_ptr& state, - int part_number, const S3Model::UploadPartRequest& req, - const Result& result) { + static Status UploadUsingSingleRequestError( + const Aws::S3::Model::PutObjectRequest& request, + const Aws::S3::Model::PutObjectOutcome& outcome) { + return ErrorToStatus( + std::forward_as_tuple("When uploading object with key '", request.GetKey(), + "' in bucket '", request.GetBucket(), "': "), + "PutObject", outcome.GetError()); + } + + Status UploadUsingSingleRequest(std::shared_ptr buffer) { + return UploadUsingSingleRequest(buffer->data(), buffer->size(), buffer); + } + + Status UploadUsingSingleRequest(const void* data, int64_t nbytes, + std::shared_ptr owned_buffer = nullptr) { + auto sync_result_callback = [](const Aws::S3::Model::PutObjectRequest& request, + std::shared_ptr state, + int32_t part_number, + Aws::S3::Model::PutObjectOutcome outcome) { + if (!outcome.IsSuccess()) { + return UploadUsingSingleRequestError(request, outcome); + } + return Status::OK(); + }; + + auto async_result_callback = [](const Aws::S3::Model::PutObjectRequest& request, + std::shared_ptr state, + int32_t part_number, + Aws::S3::Model::PutObjectOutcome outcome) { + HandleUploadUsingSingleRequestOutcome(state, request, outcome.GetResult()); + return Status::OK(); + }; + + Aws::S3::Model::PutObjectRequest req{}; + RETURN_NOT_OK(SetMetadataInRequest(&req)); + + return Upload( + std::move(req), std::move(sync_result_callback), std::move(async_result_callback), + data, nbytes, std::move(owned_buffer)); + } + + Status UploadPart(std::shared_ptr buffer) { + return UploadPart(buffer->data(), buffer->size(), buffer); + } + + static Status UploadPartError(const Aws::S3::Model::UploadPartRequest& request, + const Aws::S3::Model::UploadPartOutcome& outcome) { + return ErrorToStatus( + std::forward_as_tuple("When uploading part for key '", request.GetKey(), + "' in bucket '", request.GetBucket(), "': "), + "UploadPart", outcome.GetError()); + } + + Status UploadPart(const void* data, int64_t nbytes, + std::shared_ptr owned_buffer = nullptr) { + if (!IsMultipartCreated()) { + RETURN_NOT_OK(CreateMultipartUpload()); + } + + Aws::S3::Model::UploadPartRequest req{}; + req.SetPartNumber(part_number_); + req.SetUploadId(multipart_upload_id_); + + auto sync_result_callback = [](const Aws::S3::Model::UploadPartRequest& request, + std::shared_ptr state, + int32_t part_number, + Aws::S3::Model::UploadPartOutcome outcome) { + if (!outcome.IsSuccess()) { + return UploadPartError(request, outcome); + } else { + AddCompletedPart(state, part_number, outcome.GetResult()); + } + + return Status::OK(); + }; + + auto async_result_callback = [](const Aws::S3::Model::UploadPartRequest& request, + std::shared_ptr state, + int32_t part_number, + Aws::S3::Model::UploadPartOutcome outcome) { + HandleUploadPartOutcome(state, part_number, request, outcome.GetResult()); + return Status::OK(); + }; + + return Upload( + std::move(req), std::move(sync_result_callback), std::move(async_result_callback), + data, nbytes, std::move(owned_buffer)); + } + + static void HandleUploadUsingSingleRequestOutcome( + const std::shared_ptr& state, const S3Model::PutObjectRequest& req, + const Result& result) { + std::unique_lock lock(state->mutex); + if (!result.ok()) { + state->status &= result.status(); + } else { + const auto& outcome = *result; + if (!outcome.IsSuccess()) { + state->status &= UploadUsingSingleRequestError(req, outcome); + } + } + // GH-41862: avoid potential deadlock if the Future's callback is called + // with the mutex taken. + auto fut = state->pending_uploads_completed; + lock.unlock(); + fut.MarkFinished(state->status); + } + + static void HandleUploadPartOutcome(const std::shared_ptr& state, + int part_number, + const S3Model::UploadPartRequest& req, + const Result& result) { std::unique_lock lock(state->mutex); if (!result.ok()) { state->status &= result.status(); @@ -1895,10 +2116,10 @@ class ObjectOutputStream final : public io::OutputStream { } } // Notify completion - if (--state->parts_in_progress == 0) { + if (--state->uploads_in_progress == 0) { // GH-41862: avoid potential deadlock if the Future's callback is called // with the mutex taken. - auto fut = state->pending_parts_completed; + auto fut = state->pending_uploads_completed; lock.unlock(); // State could be mutated concurrently if another thread writes to the // stream, but in this case the Flush() call is only advisory anyway. @@ -1923,14 +2144,6 @@ class ObjectOutputStream final : public io::OutputStream { state->completed_parts[slot] = std::move(part); } - static Status UploadPartError(const S3Model::UploadPartRequest& req, - const S3Model::UploadPartOutcome& outcome) { - return ErrorToStatus( - std::forward_as_tuple("When uploading part for key '", req.GetKey(), - "' in bucket '", req.GetBucket(), "': "), - "UploadPart", outcome.GetError()); - } - protected: std::shared_ptr holder_; const io::IOContext io_context_; @@ -1938,8 +2151,9 @@ class ObjectOutputStream final : public io::OutputStream { const std::shared_ptr metadata_; const std::shared_ptr default_metadata_; const bool background_writes_; + const bool allow_delayed_open_; - Aws::String upload_id_; + Aws::String multipart_upload_id_; bool closed_ = true; int64_t pos_ = 0; int32_t part_number_ = 1; @@ -1950,10 +2164,11 @@ class ObjectOutputStream final : public io::OutputStream { // in the completion handler. struct UploadState { std::mutex mutex; + // Only populated for multi-part uploads. Aws::Vector completed_parts; - int64_t parts_in_progress = 0; + int64_t uploads_in_progress = 0; Status status; - Future<> pending_parts_completed = Future<>::MakeFinished(Status::OK()); + Future<> pending_uploads_completed = Future<>::MakeFinished(Status::OK()); }; std::shared_ptr upload_state_; }; diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h index fbbe9d0b3f42b..85d5ff8fed553 100644 --- a/cpp/src/arrow/filesystem/s3fs.h +++ b/cpp/src/arrow/filesystem/s3fs.h @@ -177,6 +177,16 @@ struct ARROW_EXPORT S3Options { /// to be true to address these scenarios. bool check_directory_existence_before_creation = false; + /// Whether to allow file-open methods to return before the actual open. + /// + /// Enabling this may reduce the latency of `OpenInputStream`, `OpenOutputStream`, + /// and similar methods, by reducing the number of roundtrips necessary. It may also + /// allow usage of more efficient S3 APIs for small files. + /// The downside is that failure conditions such as attempting to open a file in a + /// non-existing bucket will only be reported when actual I/O is done (at worse, + /// when attempting to close the file). + bool allow_delayed_open = false; + /// \brief Default metadata for OpenOutputStream. /// /// This will be ignored if non-empty metadata is passed to OpenOutputStream. diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc b/cpp/src/arrow/filesystem/s3fs_test.cc index 5a160a78ceea0..c33fa4f5aac97 100644 --- a/cpp/src/arrow/filesystem/s3fs_test.cc +++ b/cpp/src/arrow/filesystem/s3fs_test.cc @@ -45,7 +45,9 @@ #include #include #include +#include #include +#include #include #include @@ -450,25 +452,8 @@ class TestS3FS : public S3TestMixin { req.SetBucket(ToAwsString("empty-bucket")); ASSERT_OK(OutcomeToStatus("CreateBucket", client_->CreateBucket(req))); } - { - Aws::S3::Model::PutObjectRequest req; - req.SetBucket(ToAwsString("bucket")); - req.SetKey(ToAwsString("emptydir/")); - req.SetBody(std::make_shared("")); - ASSERT_OK(OutcomeToStatus("PutObject", client_->PutObject(req))); - // NOTE: no need to create intermediate "directories" somedir/ and - // somedir/subdir/ - req.SetKey(ToAwsString("somedir/subdir/subfile")); - req.SetBody(std::make_shared("sub data")); - ASSERT_OK(OutcomeToStatus("PutObject", client_->PutObject(req))); - req.SetKey(ToAwsString("somefile")); - req.SetBody(std::make_shared("some data")); - req.SetContentType("x-arrow/test"); - ASSERT_OK(OutcomeToStatus("PutObject", client_->PutObject(req))); - req.SetKey(ToAwsString("otherdir/1/2/3/otherfile")); - req.SetBody(std::make_shared("other data")); - ASSERT_OK(OutcomeToStatus("PutObject", client_->PutObject(req))); - } + + ASSERT_OK(PopulateTestBucket()); } void TearDown() override { @@ -478,6 +463,72 @@ class TestS3FS : public S3TestMixin { S3TestMixin::TearDown(); } + Status PopulateTestBucket() { + Aws::S3::Model::PutObjectRequest req; + req.SetBucket(ToAwsString("bucket")); + req.SetKey(ToAwsString("emptydir/")); + req.SetBody(std::make_shared("")); + RETURN_NOT_OK(OutcomeToStatus("PutObject", client_->PutObject(req))); + // NOTE: no need to create intermediate "directories" somedir/ and + // somedir/subdir/ + req.SetKey(ToAwsString("somedir/subdir/subfile")); + req.SetBody(std::make_shared("sub data")); + RETURN_NOT_OK(OutcomeToStatus("PutObject", client_->PutObject(req))); + req.SetKey(ToAwsString("somefile")); + req.SetBody(std::make_shared("some data")); + req.SetContentType("x-arrow/test"); + RETURN_NOT_OK(OutcomeToStatus("PutObject", client_->PutObject(req))); + req.SetKey(ToAwsString("otherdir/1/2/3/otherfile")); + req.SetBody(std::make_shared("other data")); + RETURN_NOT_OK(OutcomeToStatus("PutObject", client_->PutObject(req))); + + return Status::OK(); + } + + Status RestoreTestBucket() { + // First empty the test bucket, and then re-upload initial test files. + + Aws::S3::Model::Delete delete_object; + { + // Mostly taken from + // https://github.com/awsdocs/aws-doc-sdk-examples/blob/main/cpp/example_code/s3/list_objects.cpp + Aws::S3::Model::ListObjectsV2Request req; + req.SetBucket(Aws::String{"bucket"}); + + Aws::String continuation_token; + do { + if (!continuation_token.empty()) { + req.SetContinuationToken(continuation_token); + } + + auto outcome = client_->ListObjectsV2(req); + + if (!outcome.IsSuccess()) { + return OutcomeToStatus("ListObjectsV2", outcome); + } else { + Aws::Vector objects = outcome.GetResult().GetContents(); + for (const auto& object : objects) { + delete_object.AddObjects( + Aws::S3::Model::ObjectIdentifier().WithKey(object.GetKey())); + } + + continuation_token = outcome.GetResult().GetNextContinuationToken(); + } + } while (!continuation_token.empty()); + } + + { + Aws::S3::Model::DeleteObjectsRequest req; + + req.SetDelete(std::move(delete_object)); + req.SetBucket(Aws::String{"bucket"}); + + RETURN_NOT_OK(OutcomeToStatus("DeleteObjects", client_->DeleteObjects(req))); + } + + return PopulateTestBucket(); + } + Result> MakeNewFileSystem( io::IOContext io_context = io::default_io_context()) { options_.ConfigureAccessKey(minio_->access_key(), minio_->secret_key()); @@ -518,11 +569,13 @@ class TestS3FS : public S3TestMixin { AssertFileInfo(infos[11], "empty-bucket", FileType::Directory); } - void TestOpenOutputStream() { + void TestOpenOutputStream(bool allow_delayed_open) { std::shared_ptr stream; - // Nonexistent - ASSERT_RAISES(IOError, fs_->OpenOutputStream("nonexistent-bucket/somefile")); + if (!allow_delayed_open) { + // Nonexistent + ASSERT_RAISES(IOError, fs_->OpenOutputStream("nonexistent-bucket/somefile")); + } // URI ASSERT_RAISES(Invalid, fs_->OpenOutputStream("s3:bucket/newfile1")); @@ -843,8 +896,8 @@ TEST_F(TestS3FS, GetFileInfoGenerator) { TEST_F(TestS3FS, GetFileInfoGeneratorStress) { // This test is slow because it needs to create a bunch of seed files. However, it is - // the only test that stresses listing and deleting when there are more than 1000 files - // and paging is required. + // the only test that stresses listing and deleting when there are more than 1000 + // files and paging is required. constexpr int32_t kNumDirs = 4; constexpr int32_t kNumFilesPerDir = 512; FileInfoVector expected_infos; @@ -1235,50 +1288,83 @@ TEST_F(TestS3FS, OpenInputFile) { ASSERT_RAISES(IOError, file->Seek(10)); } -TEST_F(TestS3FS, OpenOutputStreamBackgroundWrites) { TestOpenOutputStream(); } +struct S3OptionsTestParameters { + bool background_writes{false}; + bool allow_delayed_open{false}; -TEST_F(TestS3FS, OpenOutputStreamSyncWrites) { - options_.background_writes = false; - MakeFileSystem(); - TestOpenOutputStream(); -} + void ApplyToS3Options(S3Options* options) const { + options->background_writes = background_writes; + options->allow_delayed_open = allow_delayed_open; + } -TEST_F(TestS3FS, OpenOutputStreamAbortBackgroundWrites) { TestOpenOutputStreamAbort(); } + static std::vector GetCartesianProduct() { + return { + S3OptionsTestParameters{true, false}, + S3OptionsTestParameters{false, false}, + S3OptionsTestParameters{true, true}, + S3OptionsTestParameters{false, true}, + }; + } -TEST_F(TestS3FS, OpenOutputStreamAbortSyncWrites) { - options_.background_writes = false; - MakeFileSystem(); - TestOpenOutputStreamAbort(); -} + std::string ToString() const { + return std::string("background_writes = ") + (background_writes ? "true" : "false") + + ", allow_delayed_open = " + (allow_delayed_open ? "true" : "false"); + } +}; + +TEST_F(TestS3FS, OpenOutputStream) { + for (const auto& combination : S3OptionsTestParameters::GetCartesianProduct()) { + ARROW_SCOPED_TRACE(combination.ToString()); -TEST_F(TestS3FS, OpenOutputStreamDestructorBackgroundWrites) { - TestOpenOutputStreamDestructor(); + combination.ApplyToS3Options(&options_); + MakeFileSystem(); + TestOpenOutputStream(combination.allow_delayed_open); + ASSERT_OK(RestoreTestBucket()); + } } -TEST_F(TestS3FS, OpenOutputStreamDestructorSyncWrite) { - options_.background_writes = false; - MakeFileSystem(); - TestOpenOutputStreamDestructor(); +TEST_F(TestS3FS, OpenOutputStreamAbort) { + for (const auto& combination : S3OptionsTestParameters::GetCartesianProduct()) { + ARROW_SCOPED_TRACE(combination.ToString()); + + combination.ApplyToS3Options(&options_); + MakeFileSystem(); + TestOpenOutputStreamAbort(); + ASSERT_OK(RestoreTestBucket()); + } } -TEST_F(TestS3FS, OpenOutputStreamAsyncDestructorBackgroundWrites) { - TestOpenOutputStreamCloseAsyncDestructor(); +TEST_F(TestS3FS, OpenOutputStreamDestructor) { + for (const auto& combination : S3OptionsTestParameters::GetCartesianProduct()) { + ARROW_SCOPED_TRACE(combination.ToString()); + + combination.ApplyToS3Options(&options_); + MakeFileSystem(); + TestOpenOutputStreamDestructor(); + ASSERT_OK(RestoreTestBucket()); + } } -TEST_F(TestS3FS, OpenOutputStreamAsyncDestructorSyncWrite) { - options_.background_writes = false; - MakeFileSystem(); - TestOpenOutputStreamCloseAsyncDestructor(); +TEST_F(TestS3FS, OpenOutputStreamAsync) { + for (const auto& combination : S3OptionsTestParameters::GetCartesianProduct()) { + ARROW_SCOPED_TRACE(combination.ToString()); + + combination.ApplyToS3Options(&options_); + MakeFileSystem(); + TestOpenOutputStreamCloseAsyncDestructor(); + } } TEST_F(TestS3FS, OpenOutputStreamCloseAsyncFutureDeadlockBackgroundWrites) { TestOpenOutputStreamCloseAsyncFutureDeadlock(); + ASSERT_OK(RestoreTestBucket()); } TEST_F(TestS3FS, OpenOutputStreamCloseAsyncFutureDeadlockSyncWrite) { options_.background_writes = false; MakeFileSystem(); TestOpenOutputStreamCloseAsyncFutureDeadlock(); + ASSERT_OK(RestoreTestBucket()); } TEST_F(TestS3FS, OpenOutputStreamMetadata) { @@ -1396,8 +1482,8 @@ TEST_F(TestS3FS, CustomRetryStrategy) { auto retry_strategy = std::make_shared(); options_.retry_strategy = retry_strategy; MakeFileSystem(); - // Attempt to open file that doesn't exist. Should hit TestRetryStrategy::ShouldRetry() - // 3 times before bubbling back up here. + // Attempt to open file that doesn't exist. Should hit + // TestRetryStrategy::ShouldRetry() 3 times before bubbling back up here. ASSERT_RAISES(IOError, fs_->OpenInputStream("nonexistent-bucket/somefile")); ASSERT_EQ(retry_strategy->GetErrorsEncountered().size(), 3); for (const auto& error : retry_strategy->GetErrorsEncountered()) {