Skip to content

Commit

Permalink
[BUG] Fix Parquet reads with chunk sizing (#2658)
Browse files Browse the repository at this point in the history
Another followup to #2586.

## Problem statement

#2586 incorrectly handles value reading and chunking. In that PR, only
local tests were used. Locally, chunk sizes of up to `128 * 1024` rows
are allowed, so chunk size exceeded the total number of rows to read.
However, non-local reads such as to S3 instead have a default chunk size
of `2048`. This results in a scenario where chunk size is less than the
total number of rows to read.

When this happens, if the row count of a data page aligns with the chunk
size, we continue reading the next data page to see if the last row
contains more leaf values. If the first value belongs to a new record,
then the number of rows seen would be incremented. It would then always
be the case that `rows read > additional rows to read (which is 0)`, and
the exit condition of `rows read == additional rows to read` is never
fulfilled, so we continue reading values into a chunk until the page
runs out of values. This could repeat for every subsequent data page.

The end result is that we can have columns with incorrectly sized chunks
that are incongruous with the chunk sizes of other columns, causing Daft
to error out.

**TLDR: chunk sizes were not being respected during parquet reads.**

## Solution

Instead of checking whether the `rows read == additional rows to read`
condition at the end of the loop where we iterate through a page's
values, we move the check to the start and `peek` at the value to decide
if we should continue iterating for the current chunk.

Additionally, we modify the change in #2643 so that the remaining number
of values to read are zeroed out iff the number of rows read is equal to
the total number of rows to read, and not when the number of rows read
is equal to the number of additional rows to read (which only applies to
the current chunk).

## Example

As an example, consider a parquet file with the schema `<nested
struct<field0 string, field1 string>`. Let `field0` be dictionary
encoded while `field1` uses fallback encoding. Given `4097` rows we
might get the following page layout:

```
Column: nested.field0
--------------------------------------------------------------------------------
  page   type  enc  count   avg size   size       rows     nulls   min / max
  0-D    dict  S _  2       6.00 B     12 B      
  0-1    data  S R  4097    0.00 B     13 B                0       "a" / "arr"


Column: nested.field1
--------------------------------------------------------------------------------
  page   type  enc  count   avg size   size       rows     nulls   min / max
  0-D    dict  S _  1024    2.931 kB   2.931 MB  
  0-1    data  S R  1024    1.26 B     1.261 kB            0       "a" / "zzrnqokwuddojvhlcrdnmtrad..."
  0-2    data  S _  1024    2.934 kB   2.934 MB            0       "aabhtzyyrmvztyiwyaafodbmh..." / "zyxodymgoooorpuarkpkiqjvi..."
  0-3    data  S _  1024    2.934 kB   2.934 MB            0       "aadupgntgjltmsrybltkimurs..." / "zyqwxllnhjdqrjtyeclpthwwy..."
  0-4    data  S _  1024    2.934 kB   2.934 MB            0       "aaazxwchmmahxhexenhbcssft..." / "zzlfnynbvwkertfrinofztjrk..."
  0-5    data  S _  1       2.939 kB   2.939 kB            0       "mmbzhmnbexeqknrnjftfiawsy..." / "mmbzhmnbexeqknrnjftfiawsy..."
```

Before this PR, after page `0-2` is read, we've read enough rows to fill
up a chunk of size `2048` (which is our default chunk size when reading
from S3). However, from #2586, we still read page `0-3` to check if the
row contains multiple leaf values. Before #2643, what happens is that we
see a repetition level of 0, so we increment the number of rows seen, so
`rows seen > additional rows to read for the page`, and we never fulfill
the strict `rows seen == additional rows to read` condition to stop
reading to a chunk. After #2586, we correctly note that the chunk is
full and exit, but we also consumed a value that belongs to the next
chunk, so we end up with insufficient values in the end.
  • Loading branch information
desmondcheongzx authored Aug 20, 2024
1 parent dbcc4bb commit fbce3ac
Show file tree
Hide file tree
Showing 18 changed files with 257 additions and 87 deletions.
2 changes: 2 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,14 @@ class ParquetSourceConfig:
coerce_int96_timestamp_unit: PyTimeUnit | None
field_id_mapping: dict[int, PyField] | None
row_groups: list[list[int]] | None
chunk_size: int | None

def __init__(
self,
coerce_int96_timestamp_unit: PyTimeUnit | None = None,
field_id_mapping: dict[int, PyField] | None = None,
row_groups: list[list[int]] | None = None,
chunk_size: int | None = None,
): ...

class CsvSourceConfig:
Expand Down
3 changes: 2 additions & 1 deletion daft/io/_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def read_parquet(
coerce_int96_timestamp_unit: Optional[Union[str, TimeUnit]] = None,
schema_hints: Optional[Dict[str, DataType]] = None,
_multithreaded_io: Optional[bool] = None,
_chunk_size: Optional[int] = None, # A hidden parameter for testing purposes.
) -> DataFrame:
"""Creates a DataFrame from Parquet file(s)
Expand Down Expand Up @@ -79,7 +80,7 @@ def read_parquet(
raise ValueError("row_groups are only supported when reading multiple non-globbed/wildcarded files")

file_format_config = FileFormatConfig.from_parquet_config(
ParquetSourceConfig(coerce_int96_timestamp_unit=pytimeunit, row_groups=row_groups)
ParquetSourceConfig(coerce_int96_timestamp_unit=pytimeunit, row_groups=row_groups, chunk_size=_chunk_size)
)
if use_native_downloader:
storage_config = StorageConfig.native(NativeStorageConfig(multithreaded_io, io_config))
Expand Down
3 changes: 2 additions & 1 deletion src/arrow2/src/io/parquet/read/deserialize/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ impl<O: Offset, I: Pages> Iterator for NestedIter<O, I> {
&mut self.iter,
&mut self.items,
&mut self.dict,
(&mut self.rows_remaining, &mut self.values_remaining),
&mut self.rows_remaining,
&mut self.values_remaining,
&self.init,
self.chunk_size,
&BinaryDecoder::<O>::default(),
Expand Down
3 changes: 2 additions & 1 deletion src/arrow2/src/io/parquet/read/deserialize/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ impl<I: Pages> Iterator for NestedIter<I> {
&mut self.iter,
&mut self.items,
&mut None,
(&mut self.rows_remaining, &mut self.values_remaining),
&mut self.rows_remaining,
&mut self.values_remaining,
&self.init,
self.chunk_size,
&BooleanDecoder::default(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ pub fn next_dict<K: DictionaryKey, I: Pages, F: Fn(&DictPage) -> Box<dyn Array>>
init,
items,
None,
remaining,
// TODO(issue#2537): Daft does not currently support Arrow's dictionary logical
// type, so we don't currently have a way to encounter or test this code.
// We should fix this when support for dictionary type is added by replacing this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ impl<I: Pages> Iterator for NestedIter<I> {
&mut self.iter,
&mut self.items,
&mut self.dict,
(&mut self.rows_remaining, &mut self.values_remaining),
&mut self.rows_remaining,
&mut self.values_remaining,
&self.init,
self.chunk_size,
&BinaryDecoder { size: self.size },
Expand Down
141 changes: 104 additions & 37 deletions src/arrow2/src/io/parquet/read/deserialize/nested_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,16 +350,17 @@ impl NestedState {

/// Extends `items` by consuming `page`, first trying to complete the last `item`
/// and extending it if more are needed
#[allow(clippy::too_many_arguments)]
pub(super) fn extend<'a, D: NestedDecoder<'a>>(
page: &'a DataPage,
init: &[InitNested],
items: &mut VecDeque<(NestedState, D::DecodedState)>,
dict: Option<&'a D::Dictionary>,
remaining: (&mut usize, &mut usize),
rows_remaining: &mut usize,
values_remaining: &mut usize,
decoder: &D,
chunk_size: Option<usize>,
) -> Result<()> {
let (rows_remaining, values_remaining) = remaining;
let mut values_page = decoder.build_state(page, dict)?;
let mut page = NestedPage::try_new(page)?;

Expand All @@ -382,15 +383,30 @@ pub(super) fn extend<'a, D: NestedDecoder<'a>>(
&mut page,
&mut values_page,
&mut nested.nested,
*rows_remaining,
values_remaining,
&mut decoded,
decoder,
additional,
)?;
assert!(
*rows_remaining >= nested.len() - existing,
"Rows remaining ({}) is less than the number of new rows seen ({}). Please file an issue.",
*rows_remaining,
nested.len() - existing,
);
assert!(
nested.len() <= chunk_size,
"Number of rows in the chunk ({}) exceeds the chunk size ({}). Please file an issue.",
nested.len(),
chunk_size,
);
*rows_remaining -= nested.len() - existing;
items.push_back((nested, decoded));

while page.len() > 0 && *rows_remaining > 0 {
// If we've filled the current chunk, but there are rows remaining in the current page, start
// filling up new chunks.
while page.len() > 0 && (*rows_remaining > 0 || *values_remaining > 0) {
let additional = chunk_size.min(*rows_remaining);

let mut nested = init_nested(init, additional);
Expand All @@ -399,26 +415,64 @@ pub(super) fn extend<'a, D: NestedDecoder<'a>>(
&mut page,
&mut values_page,
&mut nested.nested,
*rows_remaining,
values_remaining,
&mut decoded,
decoder,
additional,
)?;
assert!(
*rows_remaining >= nested.len(),
"Rows remaining ({}) is less than the number of new rows seen ({}). Please file an issue.",
*rows_remaining,
nested.len(),
);
assert!(
nested.len() <= chunk_size,
"Number of rows in the chunk ({}) exceeds the chunk size ({}). Please file an issue.",
nested.len(),
chunk_size,
);
*rows_remaining -= nested.len();
items.push_back((nested, decoded));
}
Ok(())
}

/// Helper function that fills a chunk with nested values decoded from the current page. At most
/// `additional` values will be added to the current chunk.
///
///
/// # Arguments
///
/// * `page` - The repetition and definition levels for the current Parquet page.
/// * `values_state` - The state of our nested values.
/// * `nested` - The state of our nested data types.
/// * `rows_remaining` - The global number of top-level rows that remain in the current row group.
/// * `values_remaining` - The global number of leaf values that remain in the current row group.
/// * `decoded` - The state of our decoded values.
/// * `decoder` - The decoder for the leaf-level type.
/// * `additional` - The number of top-level rows to read for the current chunk. This is the
/// min of `chunk size - number of rows existing in the current chunk` and
/// `rows_remaining`.
#[allow(clippy::too_many_arguments)]
fn extend_offsets2<'a, D: NestedDecoder<'a>>(
page: &mut NestedPage<'a>,
values_state: &mut D::State,
nested: &mut [Box<dyn Nested>],
rows_remaining: usize,
values_remaining: &mut usize,
decoded: &mut D::DecodedState,
decoder: &D,
additional: usize,
) -> Result<()> {
// Check that we have at least one value (which can be null) per row.
assert!(
*values_remaining >= rows_remaining,
"Values remaining({}) is lower than the number of rows remaining ({}). Please file an issue.",
*values_remaining,
rows_remaining,
);
let max_depth = nested.len();

let mut cum_sum = vec![0u32; max_depth + 1];
Expand All @@ -433,26 +487,39 @@ fn extend_offsets2<'a, D: NestedDecoder<'a>>(
cum_rep[i + 1] = cum_rep[i] + delta;
}

let mut rows = 0;
while let Some((rep, def)) = page.iter.next() {
let mut rows_seen = 0;
loop {
let next_rep = page.iter.peek().map(|x| x.0.as_ref()).transpose().unwrap();
match next_rep {
Some(next_rep) => {
// A repetition level of 0 indicates that we're reading a new record. For more
// details, see:
// Melnik, Sergey et al. “Dremel.” Proceedings of the VLDB Endowment 13 (2020): 3461 - 3472.
if *next_rep == 0 {
// A row might have values that overflow across multiple data pages. If the
// overflowing row is the last row in our result (either because it is the last
// row in the column, or in the limit(), or in the show()), then we might have
// continued reading data pages despite `rows_remaining <= rows_seen`. We only
// know that we've read all values to read when either `values_remaining` is 0,
// or when `rows_remaining <= rows_seen` and we see a new record. In the latter
// case, the remaining values lie outside of the rows we're retrieving, so we
// zero out `values_remaining`.
if rows_seen >= rows_remaining {
*values_remaining = 0;
break;
}
if rows_seen >= additional {
break;
}
rows_seen += 1;
}
}
None => break,
}
let (rep, def) = page.iter.next().unwrap();
*values_remaining -= 1;
let rep = rep?;
let def = def?;
if rep == 0 {
// A row might have values that overflow across multiple data pages. If the overflowing
// row is the last row in our result (either because it is the last row in the column,
// or in the limit(), or in the show()), then we might have continued reading data pages
// despite having read `additional` rows (where `additional` could be 0). We only know
// that we've read all values to read when either `values_remaining` is 0, or we have
// read `additional` rows and see a repetition level of 0 (which tells us that we're
// reading a new record). In the latter case, the remaining values lie outside of the
// rows we're retrieving, so we zero out `values_remaining`.
if rows == additional {
*values_remaining = 0;
break;
}
rows += 1;
}

let mut is_required = false;
for depth in 0..max_depth {
Expand Down Expand Up @@ -481,30 +548,30 @@ fn extend_offsets2<'a, D: NestedDecoder<'a>>(
}
}
}

let next_rep = page.iter.peek().map(|x| x.0.as_ref()).transpose().unwrap();
match next_rep {
Some(next_rep) => {
if *next_rep == 0 && rows == additional {
// If we see a repetition level of 0, we know that we've read all values in
// `additional` rows, and the remaining values lie outside of the rows we're
// retrieving, so we zero out `values_remaining`.
*values_remaining = 0;
break;
}
}
None => break,
}
}
assert!(
rows_seen <= rows_remaining,
"Rows seen ({}) is greater than the number of rows remaining ({}). Please file an issue.",
rows_seen,
rows_remaining,
);
assert!(
rows_seen <= additional,
"Rows seen ({}) is greater than the additional number of rows to read in the current data page ({}). Please file an issue.",
rows_seen,
additional,
);
Ok(())
}

#[allow(clippy::too_many_arguments)]
#[inline]
pub(super) fn next<'a, I, D>(
iter: &'a mut I,
items: &mut VecDeque<(NestedState, D::DecodedState)>,
dict: &'a mut Option<D::Dictionary>,
remaining: (&mut usize, &mut usize),
rows_remaining: &mut usize,
values_remaining: &mut usize,
init: &[InitNested],
chunk_size: Option<usize>,
decoder: &D,
Expand All @@ -513,7 +580,6 @@ where
I: Pages,
D: NestedDecoder<'a>,
{
let (rows_remaining, values_remaining) = remaining;
// front[a1, a2, a3, ...]back
if items.len() > 1 {
return MaybeNext::Some(Ok(items.pop_front().unwrap()));
Expand Down Expand Up @@ -554,7 +620,8 @@ where
init,
items,
dict.as_ref(),
(rows_remaining, values_remaining),
rows_remaining,
values_remaining,
decoder,
chunk_size,
);
Expand Down
3 changes: 2 additions & 1 deletion src/arrow2/src/io/parquet/read/deserialize/null/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ where
&mut self.iter,
&mut self.items,
&mut None,
(&mut self.rows_remaining, &mut self.values_remaining),
&mut self.rows_remaining,
&mut self.values_remaining,
&self.init,
self.chunk_size,
&self.decoder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ where
&mut self.iter,
&mut self.items,
&mut self.dict,
(&mut self.rows_remaining, &mut self.values_remaining),
&mut self.rows_remaining,
&mut self.values_remaining,
&self.init,
self.chunk_size,
&self.decoder,
Expand Down
Loading

0 comments on commit fbce3ac

Please sign in to comment.