From 4fc2731a3ceaf7dd5b1dce6f29bf7cad0ab2f13e Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Thu, 17 Aug 2023 10:10:36 +0900 Subject: [PATCH] GH-37144: [C++] Add RecordBatchFileReader::To{RecordBatches,Table} (#37167) ### Rationale for this change `RecordBatchReader` has them but `RecordBatchFileReader` doesn't. They are convenient. ### What changes are included in this PR? Add them. ### Are these changes tested? Yes. ### Are there any user-facing changes? Yes. * Closes: #37144 Authored-by: Sutou Kouhei Signed-off-by: Sutou Kouhei --- cpp/src/arrow/ipc/read_write_test.cc | 28 +++++++++++++++++++++++----- cpp/src/arrow/ipc/reader.cc | 16 ++++++++++++++++ cpp/src/arrow/ipc/reader.h | 6 ++++++ cpp/src/arrow/table.h | 2 +- 4 files changed, 46 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index ae5fde39d8550..6b4f70763708c 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -1184,6 +1184,13 @@ struct FileWriterHelper { return reader->metadata(); } + Result> ReadAll() { + auto buf_reader = std::make_shared(buffer_); + ARROW_ASSIGN_OR_RAISE(auto reader, + RecordBatchFileReader::Open(buf_reader.get(), footer_offset_)); + return reader->ToTable(); + } + std::shared_ptr buffer_; std::unique_ptr sink_; std::shared_ptr writer_; @@ -1875,6 +1882,21 @@ TEST(TestIpcFileFormat, FooterMetaData) { ASSERT_TRUE(out_metadata->Equals(*metadata)); } +TEST(TestIpcFileFormat, ReaderToTable) { + std::shared_ptr batch; + ASSERT_OK(MakeIntRecordBatch(&batch)); + + FileWriterHelper helper; + ASSERT_OK(helper.Init(batch->schema(), IpcWriteOptions::Defaults())); + ASSERT_OK(helper.WriteBatch(batch)); + ASSERT_OK(helper.WriteBatch(batch)); + ASSERT_OK(helper.Finish()); + + ASSERT_OK_AND_ASSIGN(auto out_table, helper.ReadAll()); + ASSERT_OK_AND_ASSIGN(auto expected_table, Table::FromRecordBatches({batch, batch})); + ASSERT_TABLES_EQUAL(*expected_table, *out_table); +} + TEST_F(TestWriteRecordBatch, RawAndSerializedSizes) { // ARROW-8823: Recording total raw and serialized record batch sizes in WriteStats FileWriterHelper helper; @@ -3012,11 +3034,7 @@ class PreBufferingTest : public ::testing::TestWithParam { auto read_options = IpcReadOptions::Defaults(); EXPECT_OK_AND_ASSIGN(auto reader, RecordBatchFileReader::Open(buffer_reader.get(), read_options)); - std::vector> expected_batches; - for (int i = 0; i < reader->num_record_batches(); i++) { - EXPECT_OK_AND_ASSIGN(auto expected_batch, reader->ReadRecordBatch(i)); - expected_batches.push_back(expected_batch); - } + EXPECT_OK_AND_ASSIGN(auto expected_batches, reader->ToRecordBatches()); return expected_batches; } diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 125a8f2d4158c..0def0e036e3c1 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -44,6 +44,7 @@ #include "arrow/record_batch.h" #include "arrow/sparse_tensor.h" #include "arrow/status.h" +#include "arrow/table.h" #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/bit_util.h" @@ -1865,6 +1866,21 @@ Future> RecordBatchFileReader::OpenAsync( .Then([=]() -> Result> { return result; }); } +Result RecordBatchFileReader::ToRecordBatches() { + RecordBatchVector batches; + const auto n = num_record_batches(); + for (int i = 0; i < n; ++i) { + ARROW_ASSIGN_OR_RAISE(auto batch, ReadRecordBatch(i)); + batches.emplace_back(std::move(batch)); + } + return batches; +} + +Result> RecordBatchFileReader::ToTable() { + ARROW_ASSIGN_OR_RAISE(auto batches, ToRecordBatches()); + return Table::FromRecordBatches(schema(), std::move(batches)); +} + Future SelectiveIpcFileRecordBatchGenerator::operator()() { int index = index_++; diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index 1698abd14b48f..9440fcaba92d6 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -229,6 +229,12 @@ class ARROW_EXPORT RecordBatchFileReader const io::IOContext& io_context = io::default_io_context(), const io::CacheOptions cache_options = io::CacheOptions::LazyDefaults(), arrow::internal::Executor* executor = NULLPTR) = 0; + + /// \brief Collect all batches as a vector of record batches + Result ToRecordBatches(); + + /// \brief Collect all batches and concatenate as arrow::Table + Result> ToTable(); }; /// \brief A general listener class to receive events. diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h index 2171f3331add4..940ff73ae983f 100644 --- a/cpp/src/arrow/table.h +++ b/cpp/src/arrow/table.h @@ -75,7 +75,7 @@ class ARROW_EXPORT Table { /// \brief Construct a Table from a RecordBatchReader. /// - /// \param[in] reader the arrow::Schema for each batch + /// \param[in] reader the arrow::RecordBatchReader that produces batches static Result> FromRecordBatchReader(RecordBatchReader* reader); /// \brief Construct a Table from RecordBatches, using schema supplied by the first