From 6a0c1e748ff77f6bae10147744e180ad408f915e Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 29 May 2024 17:35:48 +0200 Subject: [PATCH] GH-41862: [C++][Dataset] Try out possible fix number 1 --- cpp/src/arrow/dataset/dataset_writer.cc | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc index c60042dd6fef8..f6f41799ef893 100644 --- a/cpp/src/arrow/dataset/dataset_writer.cc +++ b/cpp/src/arrow/dataset/dataset_writer.cc @@ -330,21 +330,21 @@ class DatasetWriterDirectoryQueue { Status FinishCurrentFile() { if (latest_open_file_) { - ARROW_RETURN_NOT_OK(latest_open_file_->Finish()); + Status st = latest_open_file_->Finish(); latest_open_file_tasks_.reset(); - latest_open_file_ = nullptr; + latest_open_file_.reset(); + ARROW_RETURN_NOT_OK(st); } rows_written_ = 0; return GetNextFilename().Value(¤t_filename_); } Status OpenFileQueue(const std::string& filename) { - auto file_queue = - std::make_unique(schema_, write_options_, writer_state_); - latest_open_file_ = file_queue.get(); + latest_open_file_ = + std::make_shared(schema_, write_options_, writer_state_); // Create a dedicated throttle for write jobs to this file and keep it alive until we // are finished and have closed the file. - auto file_finish_task = [this, file_queue = std::move(file_queue)] { + auto file_finish_task = [this, file_queue = latest_open_file_] { writer_state_->open_files_throttle.Release(1); return Status::OK(); }; @@ -413,9 +413,10 @@ class DatasetWriterDirectoryQueue { Status Finish() { if (latest_open_file_) { - ARROW_RETURN_NOT_OK(latest_open_file_->Finish()); + Status st = latest_open_file_->Finish(); latest_open_file_tasks_.reset(); - latest_open_file_ = nullptr; + latest_open_file_.reset(); + ARROW_RETURN_NOT_OK(st); } used_filenames_.clear(); return Status::OK(); @@ -431,7 +432,7 @@ class DatasetWriterDirectoryQueue { Future<> init_future_; std::string current_filename_; std::unordered_set used_filenames_; - DatasetWriterFileQueue* latest_open_file_ = nullptr; + std::shared_ptr latest_open_file_; std::unique_ptr latest_open_file_tasks_; uint64_t rows_written_ = 0; uint32_t file_counter_ = 0;