Skip to content

Commit

Permalink
GH-37144: [C++] Add RecordBatchFileReader::To{RecordBatches,Table} (#…
Browse files Browse the repository at this point in the history
…37167)

### Rationale for this change

`RecordBatchReader` has them but `RecordBatchFileReader` doesn't. They are convenient.

### What changes are included in this PR?

Add them.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.
* Closes: #37144

Authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
  • Loading branch information
kou authored Aug 17, 2023
1 parent 4680ca5 commit 4fc2731
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 6 deletions.
28 changes: 23 additions & 5 deletions cpp/src/arrow/ipc/read_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,13 @@ struct FileWriterHelper {
return reader->metadata();
}

Result<std::shared_ptr<Table>> ReadAll() {
auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
ARROW_ASSIGN_OR_RAISE(auto reader,
RecordBatchFileReader::Open(buf_reader.get(), footer_offset_));
return reader->ToTable();
}

std::shared_ptr<ResizableBuffer> buffer_;
std::unique_ptr<io::BufferOutputStream> sink_;
std::shared_ptr<RecordBatchWriter> writer_;
Expand Down Expand Up @@ -1875,6 +1882,21 @@ TEST(TestIpcFileFormat, FooterMetaData) {
ASSERT_TRUE(out_metadata->Equals(*metadata));
}

TEST(TestIpcFileFormat, ReaderToTable) {
std::shared_ptr<RecordBatch> batch;
ASSERT_OK(MakeIntRecordBatch(&batch));

FileWriterHelper helper;
ASSERT_OK(helper.Init(batch->schema(), IpcWriteOptions::Defaults()));
ASSERT_OK(helper.WriteBatch(batch));
ASSERT_OK(helper.WriteBatch(batch));
ASSERT_OK(helper.Finish());

ASSERT_OK_AND_ASSIGN(auto out_table, helper.ReadAll());
ASSERT_OK_AND_ASSIGN(auto expected_table, Table::FromRecordBatches({batch, batch}));
ASSERT_TABLES_EQUAL(*expected_table, *out_table);
}

TEST_F(TestWriteRecordBatch, RawAndSerializedSizes) {
// ARROW-8823: Recording total raw and serialized record batch sizes in WriteStats
FileWriterHelper helper;
Expand Down Expand Up @@ -3012,11 +3034,7 @@ class PreBufferingTest : public ::testing::TestWithParam<bool> {
auto read_options = IpcReadOptions::Defaults();
EXPECT_OK_AND_ASSIGN(auto reader,
RecordBatchFileReader::Open(buffer_reader.get(), read_options));
std::vector<std::shared_ptr<RecordBatch>> expected_batches;
for (int i = 0; i < reader->num_record_batches(); i++) {
EXPECT_OK_AND_ASSIGN(auto expected_batch, reader->ReadRecordBatch(i));
expected_batches.push_back(expected_batch);
}
EXPECT_OK_AND_ASSIGN(auto expected_batches, reader->ToRecordBatches());
return expected_batches;
}

Expand Down
16 changes: 16 additions & 0 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "arrow/record_batch.h"
#include "arrow/sparse_tensor.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit_util.h"
Expand Down Expand Up @@ -1865,6 +1866,21 @@ Future<std::shared_ptr<RecordBatchFileReader>> RecordBatchFileReader::OpenAsync(
.Then([=]() -> Result<std::shared_ptr<RecordBatchFileReader>> { return result; });
}

Result<RecordBatchVector> RecordBatchFileReader::ToRecordBatches() {
RecordBatchVector batches;
const auto n = num_record_batches();
for (int i = 0; i < n; ++i) {
ARROW_ASSIGN_OR_RAISE(auto batch, ReadRecordBatch(i));
batches.emplace_back(std::move(batch));
}
return batches;
}

Result<std::shared_ptr<Table>> RecordBatchFileReader::ToTable() {
ARROW_ASSIGN_OR_RAISE(auto batches, ToRecordBatches());
return Table::FromRecordBatches(schema(), std::move(batches));
}

Future<SelectiveIpcFileRecordBatchGenerator::Item>
SelectiveIpcFileRecordBatchGenerator::operator()() {
int index = index_++;
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/ipc/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,12 @@ class ARROW_EXPORT RecordBatchFileReader
const io::IOContext& io_context = io::default_io_context(),
const io::CacheOptions cache_options = io::CacheOptions::LazyDefaults(),
arrow::internal::Executor* executor = NULLPTR) = 0;

/// \brief Collect all batches as a vector of record batches
Result<RecordBatchVector> ToRecordBatches();

/// \brief Collect all batches and concatenate as arrow::Table
Result<std::shared_ptr<Table>> ToTable();
};

/// \brief A general listener class to receive events.
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class ARROW_EXPORT Table {

/// \brief Construct a Table from a RecordBatchReader.
///
/// \param[in] reader the arrow::Schema for each batch
/// \param[in] reader the arrow::RecordBatchReader that produces batches
static Result<std::shared_ptr<Table>> FromRecordBatchReader(RecordBatchReader* reader);

/// \brief Construct a Table from RecordBatches, using schema supplied by the first
Expand Down

0 comments on commit 4fc2731

Please sign in to comment.