Skip to content

Commit

Permalink
apacheGH-36189: [C++][Parquet] StreamReader::SkipRows() skips to inco…
Browse files Browse the repository at this point in the history
…rrect place in multi-row-group files (apache#36191)

### Rationale for this change

The behavior of Parquet `StreamReader::SkipRows()` is wrong due to an error in calculating the row offset from the current row group. 

### What changes are included in this PR?

A unit test case demonstrating the failure and a trivial fix. 

### Are these changes tested?

Yes 

### Are there any user-facing changes?

No

I am not sure if this bug is critical given how long it has existed in the code and no one has seemed to notice. There are two manifestations of this bug that might give the user the wrong impression about what is in their data: 

* sometimes a negative return value is returned, which is unexpected given the nature of the API, so the user should know something is up (this is how I discovered the bug)
* the `SkipRows()` call leads to setting of the `eof` flag prematurely, which might lead the user to think there is less data in the file than there is. 
* Closes: apache#36189

Lead-authored-by: Paul Rosenfeld <[email protected]>
Co-authored-by: Gang Wu <[email protected]>
Co-authored-by: KarateSnowMachine <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Gang Wu <[email protected]>
  • Loading branch information
3 people authored Aug 3, 2023
1 parent 0de6673 commit 710f960
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 13 deletions.
2 changes: 1 addition & 1 deletion cpp/src/parquet/stream_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ int64_t StreamReader::SkipRows(int64_t num_rows_to_skip) {
while (!eof_ && (num_rows_remaining_to_skip > 0)) {
int64_t num_rows_in_row_group = row_group_reader_->metadata()->num_rows();
int64_t num_rows_remaining_in_row_group =
num_rows_in_row_group - current_row_ - row_group_row_offset_;
num_rows_in_row_group - (current_row_ - row_group_row_offset_);

if (num_rows_remaining_in_row_group > num_rows_remaining_to_skip) {
for (auto reader : column_readers_) {
Expand Down
112 changes: 100 additions & 12 deletions cpp/src/parquet/stream_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@

#include "parquet/stream_reader.h"

#include <fcntl.h>
#include <gtest/gtest.h>

#include <chrono>
#include <ctime>
#include <memory>
#include <utility>

#include "arrow/io/file.h"
#include "arrow/util/decimal.h"
Expand All @@ -38,7 +36,7 @@ using optional = StreamReader::optional<T>;
using ::std::nullopt;

struct TestData {
static void init() { std::time(&ts_offset_); }
static void Init() { std::time(&ts_offset_); }

static constexpr int num_rows = 2000;

Expand Down Expand Up @@ -145,18 +143,18 @@ constexpr int TestData::num_rows;

class TestStreamReader : public ::testing::Test {
public:
TestStreamReader() { createTestFile(); }
TestStreamReader() { CreateTestFile(); }

protected:
const char* GetDataFile() const { return "stream_reader_test.parquet"; }

void SetUp() {
void SetUp() override {
PARQUET_ASSIGN_OR_THROW(auto infile, ::arrow::io::ReadableFile::Open(GetDataFile()));
auto file_reader = parquet::ParquetFileReader::Open(infile);
reader_ = StreamReader{std::move(file_reader)};
}

void TearDown() { reader_ = StreamReader{}; }
void TearDown() override { reader_ = StreamReader{}; }

std::shared_ptr<schema::GroupNode> GetSchema() {
schema::NodeVector fields;
Expand Down Expand Up @@ -201,15 +199,15 @@ class TestStreamReader : public ::testing::Test {
schema::GroupNode::Make("schema", Repetition::REQUIRED, fields));
}

void createTestFile() {
void CreateTestFile() {
PARQUET_ASSIGN_OR_THROW(auto outfile,
::arrow::io::FileOutputStream::Open(GetDataFile()));

auto file_writer = ParquetFileWriter::Open(outfile, GetSchema());

StreamWriter os{std::move(file_writer)};

TestData::init();
TestData::Init();

for (auto i = 0; i < TestData::num_rows; ++i) {
os << TestData::GetBool(i);
Expand Down Expand Up @@ -586,7 +584,7 @@ TEST_F(TestStreamReader, SkipColumns) {

class TestOptionalFields : public ::testing::Test {
public:
TestOptionalFields() { createTestFile(); }
TestOptionalFields() { CreateTestFile(); }

protected:
const char* GetDataFile() const { return "stream_reader_test_optional_fields.parquet"; }
Expand Down Expand Up @@ -644,13 +642,13 @@ class TestOptionalFields : public ::testing::Test {
schema::GroupNode::Make("schema", Repetition::REQUIRED, fields));
}

void createTestFile() {
void CreateTestFile() {
PARQUET_ASSIGN_OR_THROW(auto outfile,
::arrow::io::FileOutputStream::Open(GetDataFile()));

StreamWriter os{ParquetFileWriter::Open(outfile, GetSchema())};

TestData::init();
TestData::Init();

for (auto i = 0; i < TestData::num_rows; ++i) {
os << TestData::GetOptBool(i);
Expand Down Expand Up @@ -732,7 +730,7 @@ TEST_F(TestOptionalFields, ReadOptionalFieldAsRequiredField) {
_provided_ that the optional value is available.
This can be useful if a schema is changed such that a required
field beomes optional. Applications can continue reading the
field becomes optional. Applications can continue reading the
field as if it were mandatory and do not need to be changed if the
field value is always provided.
Expand Down Expand Up @@ -947,5 +945,95 @@ TEST_F(TestReadingDataFiles, ByteArrayDecimal) {
EXPECT_EQ(i, 25);
}

class TestMultiRowGroupStreamReader : public ::testing::Test {
protected:
const char* GetDataFile() const { return "stream_reader_multirowgroup_test.parquet"; }

void SetUp() override {
CreateTestFile();
PARQUET_ASSIGN_OR_THROW(auto infile, ::arrow::io::ReadableFile::Open(GetDataFile()));
auto file_reader = parquet::ParquetFileReader::Open(infile);
reader_ = StreamReader{std::move(file_reader)};
}

void TearDown() override { reader_ = StreamReader{}; }

std::shared_ptr<schema::GroupNode> GetSchema() {
schema::NodeVector fields;
fields.push_back(schema::PrimitiveNode::Make("row_group_number", Repetition::REQUIRED,
Type::INT32, ConvertedType::UINT_16));

fields.push_back(schema::PrimitiveNode::Make("row_number", Repetition::REQUIRED,
Type::INT64, ConvertedType::UINT_64));

return std::static_pointer_cast<schema::GroupNode>(
schema::GroupNode::Make("schema", Repetition::REQUIRED, fields));
}

void CreateTestFile() {
PARQUET_ASSIGN_OR_THROW(auto outfile,
::arrow::io::FileOutputStream::Open(GetDataFile()));

auto file_writer = ParquetFileWriter::Open(outfile, GetSchema());

StreamWriter os{std::move(file_writer)};

int nrows = 0;
for (auto group = 0; group < kNumGroups; ++group) {
for (auto i = 0; i < kNumRowsPerGroup; ++i) {
os << static_cast<uint16_t>(group);
os << static_cast<uint64_t>(nrows);
os << EndRow;
nrows++;
}
os.EndRowGroup();
}
}

void ReadRowAndAssertPosition(uint64_t expected_row_num) {
const auto expected_group_num =
static_cast<uint16_t>(expected_row_num / kNumRowsPerGroup);
ASSERT_FALSE(reader_.eof());
uint16_t group_num = 0;
uint64_t row_num = 0;
reader_ >> group_num >> row_num >> EndRow;
ASSERT_EQ(group_num, expected_group_num);
ASSERT_EQ(row_num, expected_row_num);
}

StreamReader reader_;
static constexpr int kNumGroups = 5;
static constexpr int kNumRowsPerGroup = 10;
};

TEST_F(TestMultiRowGroupStreamReader, SkipRows) {
// skip somewhere into the middle of a row group somewhere in the middle of the file
auto current_row = 33;

auto retval = reader_.SkipRows(current_row);
ASSERT_EQ(retval, current_row);
ReadRowAndAssertPosition(current_row);
// reading the row advances by 1
current_row += 1; // row=34

// skip a few more but stay inside the row group
retval = reader_.SkipRows(4);
current_row += 4; // row=38
ASSERT_EQ(retval, 4);
ReadRowAndAssertPosition(current_row);
current_row += 1; // row=39

// skip one more row to get to a group boundary
retval = reader_.SkipRows(1);
current_row += 1; // row=40
ASSERT_EQ(retval, 1);
ReadRowAndAssertPosition(current_row);

// finally, skip off the end of the file
retval = reader_.SkipRows(10);
ASSERT_EQ(retval, 9); // requested to skip 10 but only 9 rows left in file
EXPECT_TRUE(reader_.eof());
}

} // namespace test
} // namespace parquet

0 comments on commit 710f960

Please sign in to comment.