Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
OliLay committed Jul 17, 2024
1 parent ceb5de8 commit 57ff992
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 22 deletions.
6 changes: 4 additions & 2 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1596,7 +1596,8 @@ class ObjectInputFile final : public io::RandomAccessFile {
static constexpr int64_t kPartUploadSize = 10 * 1024 * 1024;

// Above this threshold, use a multi-part upload instead of a single request upload. Only
// relevant if early sanitization of writing to the bucket is activated.
// relevant if early sanitization of writing to the bucket is disabled (see
// `allow_delayed_open`).
static constexpr int64_t kMultiPartUploadThresholdSize = kPartUploadSize - 1;

static_assert(kMultiPartUploadThresholdSize < kPartUploadSize,
Expand Down Expand Up @@ -2094,8 +2095,9 @@ class ObjectOutputStream final : public io::OutputStream {
}
// GH-41862: avoid potential deadlock if the Future's callback is called
// with the mutex taken.
auto fut = state->pending_uploads_completed;
lock.unlock();
state->pending_uploads_completed.MarkFinished(state->status);
fut.MarkFinished(state->status);
}

static void HandleUploadPartOutcome(const std::shared_ptr<UploadState>& state,
Expand Down
35 changes: 15 additions & 20 deletions cpp/src/arrow/filesystem/s3fs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
#include <utility>
#include <vector>

#include <aws/s3/model/DeleteObjectsRequest.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <gmock/gmock-matchers.h>
#include <gtest/gtest.h>

Expand All @@ -47,7 +45,9 @@
#include <aws/core/utils/logging/ConsoleLogSystem.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/CreateBucketRequest.h>
#include <aws/s3/model/DeleteObjectsRequest.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/sts/STSClient.h>

Expand Down Expand Up @@ -488,12 +488,12 @@ class TestS3FS : public S3TestMixin {
Status RestoreTestBucket() {
// First empty the test bucket, and then re-upload initial test files.

Aws::Vector<Aws::S3::Model::Object> all_objects;
Aws::S3::Model::Delete delete_object;
{
// Mostly taken from
// https://github.com/awsdocs/aws-doc-sdk-examples/blob/main/cpp/example_code/s3/list_objects.cpp
Aws::S3::Model::ListObjectsV2Request req;
req.WithBucket(Aws::String{"bucket"});
req.SetBucket(Aws::String{"bucket"});

Aws::String continuation_token;
do {
Expand All @@ -507,7 +507,11 @@ class TestS3FS : public S3TestMixin {
return OutcomeToStatus("ListObjectsV2", outcome);
} else {
Aws::Vector<Aws::S3::Model::Object> objects = outcome.GetResult().GetContents();
all_objects.insert(all_objects.end(), objects.begin(), objects.end());
for (const auto& object : objects) {
delete_object.AddObjects(
Aws::S3::Model::ObjectIdentifier().WithKey(object.GetKey()));
}

continuation_token = outcome.GetResult().GetNextContinuationToken();
}
} while (!continuation_token.empty());
Expand All @@ -516,13 +520,7 @@ class TestS3FS : public S3TestMixin {
{
Aws::S3::Model::DeleteObjectsRequest req;

Aws::S3::Model::Delete delete_object;
for (const auto& object : all_objects) {
delete_object.AddObjects(
Aws::S3::Model::ObjectIdentifier().WithKey(object.GetKey()));
}

req.SetDelete(delete_object);
req.SetDelete(std::move(delete_object));
req.SetBucket(Aws::String{"bucket"});

RETURN_NOT_OK(OutcomeToStatus("DeleteObjects", client_->DeleteObjects(req)));
Expand Down Expand Up @@ -639,8 +637,6 @@ class TestS3FS : public S3TestMixin {
ASSERT_OK(stream->Close());
ASSERT_TRUE(weak_fs.expired());
AssertObjectContents(client_.get(), "bucket", "newfile99", "some other data");

ASSERT_OK(RestoreTestBucket());
}

void TestOpenOutputStreamAbort() {
Expand All @@ -651,8 +647,6 @@ class TestS3FS : public S3TestMixin {
ASSERT_OK(stream->Abort());
ASSERT_EQ(stream->closed(), true);
AssertObjectContents(client_.get(), "bucket", "somefile", "some data");

ASSERT_OK(RestoreTestBucket());
}

void TestOpenOutputStreamDestructor() {
Expand All @@ -662,8 +656,6 @@ class TestS3FS : public S3TestMixin {
// Destructor implicitly closes stream and completes the multipart upload.
stream.reset();
AssertObjectContents(client_.get(), "bucket", "somefile", "new data");

ASSERT_OK(RestoreTestBucket());
}

void TestOpenOutputStreamCloseAsyncDestructor() {
Expand Down Expand Up @@ -696,8 +688,6 @@ class TestS3FS : public S3TestMixin {
});
ASSERT_OK(close_fut.MoveResult());
AssertObjectContents(client_.get(), "bucket", "somefile", "new data");

ASSERT_OK(RestoreTestBucket());
}

protected:
Expand Down Expand Up @@ -1329,6 +1319,7 @@ TEST_F(TestS3FS, OpenOutputStream) {
combination.ApplyToS3Options(&options_);
MakeFileSystem();
TestOpenOutputStream(combination.allow_delayed_open);
ASSERT_OK(RestoreTestBucket());
}
}

Expand All @@ -1339,6 +1330,7 @@ TEST_F(TestS3FS, OpenOutputStreamAbort) {
combination.ApplyToS3Options(&options_);
MakeFileSystem();
TestOpenOutputStreamAbort();
ASSERT_OK(RestoreTestBucket());
}
}

Expand All @@ -1349,6 +1341,7 @@ TEST_F(TestS3FS, OpenOutputStreamDestructor) {
combination.ApplyToS3Options(&options_);
MakeFileSystem();
TestOpenOutputStreamDestructor();
ASSERT_OK(RestoreTestBucket());
}
}

Expand All @@ -1364,12 +1357,14 @@ TEST_F(TestS3FS, OpenOutputStreamAsync) {

TEST_F(TestS3FS, OpenOutputStreamCloseAsyncFutureDeadlockBackgroundWrites) {
TestOpenOutputStreamCloseAsyncFutureDeadlock();
ASSERT_OK(RestoreTestBucket());
}

TEST_F(TestS3FS, OpenOutputStreamCloseAsyncFutureDeadlockSyncWrite) {
options_.background_writes = false;
MakeFileSystem();
TestOpenOutputStreamCloseAsyncFutureDeadlock();
ASSERT_OK(RestoreTestBucket());
}

TEST_F(TestS3FS, OpenOutputStreamMetadata) {
Expand Down

0 comments on commit 57ff992

Please sign in to comment.