Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Sep 4, 2024
1 parent 50219ef commit c13fe56
Showing 1 changed file with 31 additions and 9 deletions.
40 changes: 31 additions & 9 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,7 @@ class ColumnReaderImplBase {
max_size -= def_levels_bytes;
}

current_page_may_have_nulls_ = max_def_level_ > 0 || max_rep_level_ > 0;
return levels_byte_size;
}

Expand Down Expand Up @@ -857,6 +858,7 @@ class ColumnReaderImplBase {
static_cast<int>(num_buffered_values_), buffer);
}

current_page_may_have_nulls_ = page.num_nulls() > 0;
return total_levels_length;
}

Expand Down Expand Up @@ -926,6 +928,8 @@ class ColumnReaderImplBase {
// Not set for flat schemas.
LevelDecoder repetition_level_decoder_;

bool current_page_may_have_nulls_;

// The total number of values stored in the data page. This is the maximum of
// the number of encoded definition levels or encoded values. For
// non-repeated, required columns, this is equal to the number of encoded
Expand Down Expand Up @@ -1214,9 +1218,13 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatchSpaced(
null_count = validity_io.null_count;
*values_read = validity_io.values_read;

total_values =
this->ReadValuesSpaced(*values_read, values, static_cast<int>(null_count),
valid_bits, valid_bits_offset);
if (null_count != 0) {
total_values =
this->ReadValuesSpaced(*values_read, values, static_cast<int>(null_count),
valid_bits, valid_bits_offset);
} else {
total_values = this->ReadValues(*values_read, values);
}
}
*levels_read = num_def_levels;
*null_count_out = null_count;
Expand Down Expand Up @@ -1932,6 +1940,16 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>,
// levels_position_ must already be incremented based on number of records
// read.
ARROW_DCHECK_GE(levels_position_, start_levels_position);

if (!this->current_page_may_have_nulls_) {
*values_to_read = levels_position_ - start_levels_position;
*null_count = 0;
ReadValuesDense(*values_to_read);
::arrow::bit_util::SetBitsTo(valid_bits_->mutable_data(), values_written_,
*values_to_read, true);
return;
}

ValidityBitmapInputOutput validity_io;
validity_io.values_read_upper_bound = levels_position_ - start_levels_position;
validity_io.valid_bits = valid_bits_->mutable_data();
Expand All @@ -1943,7 +1961,11 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>,
*null_count = validity_io.null_count;
ARROW_DCHECK_GE(*values_to_read, 0);
ARROW_DCHECK_GE(*null_count, 0);
ReadValuesSpaced(validity_io.values_read, *null_count);
if (null_count != 0) {
ReadValuesSpaced(validity_io.values_read, *null_count);
} else {
ReadValuesDense(validity_io.values_read);
}
}

// Return number of logical records read.
Expand Down Expand Up @@ -2088,6 +2110,10 @@ class FLBARecordReader final : public TypedRecordReader<FLBAType>,
}

void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override {
if (null_count == 0) {
ReadValuesDense(values_to_read);
return;
}
uint8_t* valid_bits = valid_bits_->mutable_data();
const int64_t valid_bits_offset = values_written_;
auto values = ValuesHead<FLBA>();
Expand All @@ -2099,11 +2125,7 @@ class FLBARecordReader final : public TypedRecordReader<FLBAType>,

PARQUET_THROW_NOT_OK(null_bitmap_builder_.Reserve(num_decoded));
PARQUET_THROW_NOT_OK(data_builder_.Reserve(num_decoded * byte_width_));
if (null_count == 0) {
UnsafeAppendDense(values, num_decoded);
} else {
UnsafeAppendSpaced(values, num_decoded, valid_bits, valid_bits_offset);
}
UnsafeAppendSpaced(values, num_decoded, valid_bits, valid_bits_offset);
ResetValues();
}

Expand Down

0 comments on commit c13fe56

Please sign in to comment.