diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 33129b6f85bbe..fd5b2e5be2a3a 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -1596,7 +1596,8 @@ class ObjectInputFile final : public io::RandomAccessFile { static constexpr int64_t kPartUploadSize = 10 * 1024 * 1024; // Above this threshold, use a multi-part upload instead of a single request upload. Only -// relevant if early sanitization of writing to the bucket is activated. +// relevant if early sanitization of writing to the bucket is disabled (see +// `allow_delayed_open`). static constexpr int64_t kMultiPartUploadThresholdSize = kPartUploadSize - 1; static_assert(kMultiPartUploadThresholdSize < kPartUploadSize, @@ -2094,8 +2095,9 @@ class ObjectOutputStream final : public io::OutputStream { } // GH-41862: avoid potential deadlock if the Future's callback is called // with the mutex taken. + auto fut = state->pending_uploads_completed; lock.unlock(); - state->pending_uploads_completed.MarkFinished(state->status); + fut.MarkFinished(state->status); } static void HandleUploadPartOutcome(const std::shared_ptr& state, diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc b/cpp/src/arrow/filesystem/s3fs_test.cc index 068df6d711c40..c33fa4f5aac97 100644 --- a/cpp/src/arrow/filesystem/s3fs_test.cc +++ b/cpp/src/arrow/filesystem/s3fs_test.cc @@ -23,8 +23,6 @@ #include #include -#include -#include #include #include @@ -47,7 +45,9 @@ #include #include #include +#include #include +#include #include #include @@ -488,12 +488,12 @@ class TestS3FS : public S3TestMixin { Status RestoreTestBucket() { // First empty the test bucket, and then re-upload initial test files. - Aws::Vector all_objects; + Aws::S3::Model::Delete delete_object; { // Mostly taken from // https://github.com/awsdocs/aws-doc-sdk-examples/blob/main/cpp/example_code/s3/list_objects.cpp Aws::S3::Model::ListObjectsV2Request req; - req.WithBucket(Aws::String{"bucket"}); + req.SetBucket(Aws::String{"bucket"}); Aws::String continuation_token; do { @@ -507,7 +507,11 @@ class TestS3FS : public S3TestMixin { return OutcomeToStatus("ListObjectsV2", outcome); } else { Aws::Vector objects = outcome.GetResult().GetContents(); - all_objects.insert(all_objects.end(), objects.begin(), objects.end()); + for (const auto& object : objects) { + delete_object.AddObjects( + Aws::S3::Model::ObjectIdentifier().WithKey(object.GetKey())); + } + continuation_token = outcome.GetResult().GetNextContinuationToken(); } } while (!continuation_token.empty()); @@ -516,13 +520,7 @@ class TestS3FS : public S3TestMixin { { Aws::S3::Model::DeleteObjectsRequest req; - Aws::S3::Model::Delete delete_object; - for (const auto& object : all_objects) { - delete_object.AddObjects( - Aws::S3::Model::ObjectIdentifier().WithKey(object.GetKey())); - } - - req.SetDelete(delete_object); + req.SetDelete(std::move(delete_object)); req.SetBucket(Aws::String{"bucket"}); RETURN_NOT_OK(OutcomeToStatus("DeleteObjects", client_->DeleteObjects(req))); @@ -639,8 +637,6 @@ class TestS3FS : public S3TestMixin { ASSERT_OK(stream->Close()); ASSERT_TRUE(weak_fs.expired()); AssertObjectContents(client_.get(), "bucket", "newfile99", "some other data"); - - ASSERT_OK(RestoreTestBucket()); } void TestOpenOutputStreamAbort() { @@ -651,8 +647,6 @@ class TestS3FS : public S3TestMixin { ASSERT_OK(stream->Abort()); ASSERT_EQ(stream->closed(), true); AssertObjectContents(client_.get(), "bucket", "somefile", "some data"); - - ASSERT_OK(RestoreTestBucket()); } void TestOpenOutputStreamDestructor() { @@ -662,8 +656,6 @@ class TestS3FS : public S3TestMixin { // Destructor implicitly closes stream and completes the multipart upload. stream.reset(); AssertObjectContents(client_.get(), "bucket", "somefile", "new data"); - - ASSERT_OK(RestoreTestBucket()); } void TestOpenOutputStreamCloseAsyncDestructor() { @@ -696,8 +688,6 @@ class TestS3FS : public S3TestMixin { }); ASSERT_OK(close_fut.MoveResult()); AssertObjectContents(client_.get(), "bucket", "somefile", "new data"); - - ASSERT_OK(RestoreTestBucket()); } protected: @@ -1329,6 +1319,7 @@ TEST_F(TestS3FS, OpenOutputStream) { combination.ApplyToS3Options(&options_); MakeFileSystem(); TestOpenOutputStream(combination.allow_delayed_open); + ASSERT_OK(RestoreTestBucket()); } } @@ -1339,6 +1330,7 @@ TEST_F(TestS3FS, OpenOutputStreamAbort) { combination.ApplyToS3Options(&options_); MakeFileSystem(); TestOpenOutputStreamAbort(); + ASSERT_OK(RestoreTestBucket()); } } @@ -1349,6 +1341,7 @@ TEST_F(TestS3FS, OpenOutputStreamDestructor) { combination.ApplyToS3Options(&options_); MakeFileSystem(); TestOpenOutputStreamDestructor(); + ASSERT_OK(RestoreTestBucket()); } } @@ -1364,12 +1357,14 @@ TEST_F(TestS3FS, OpenOutputStreamAsync) { TEST_F(TestS3FS, OpenOutputStreamCloseAsyncFutureDeadlockBackgroundWrites) { TestOpenOutputStreamCloseAsyncFutureDeadlock(); + ASSERT_OK(RestoreTestBucket()); } TEST_F(TestS3FS, OpenOutputStreamCloseAsyncFutureDeadlockSyncWrite) { options_.background_writes = false; MakeFileSystem(); TestOpenOutputStreamCloseAsyncFutureDeadlock(); + ASSERT_OK(RestoreTestBucket()); } TEST_F(TestS3FS, OpenOutputStreamMetadata) {