From 48632c6aec7075d19152be54a0ede315db13c355 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Fri, 9 Aug 2024 14:43:22 -0700 Subject: [PATCH] [FEAT] Remote parquet streaming (#2620) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds streaming reads to remote parquet files. The algorithm is similar to that for local parquet files: Read bytes into memory -> get arrow chunk iterator -> emit table per chunk Q6 Memory Profile: Screenshot 2024-08-07 at 12 38 18 PM Streaming Screenshot 2024-08-07 at 12 38 41 PM Bulk --------- Co-authored-by: Colin Ho Co-authored-by: Colin Ho --- src/daft-parquet/src/file.rs | 171 ++++++++++++++++++++++--- src/daft-parquet/src/read.rs | 51 +++++++- src/daft-parquet/src/stream_reader.rs | 176 ++++++++++++++------------ src/parquet2/src/read/page/stream.rs | 2 +- 4 files changed, 299 insertions(+), 101 deletions(-) diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index fc57e87607..f268efdaa0 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -24,7 +24,9 @@ use crate::{ metadata::read_parquet_metadata, read::ParquetSchemaInferenceOptions, read_planner::{CoalescePass, RangesContainer, ReadPlanner, SplitLargeRequestPass}, - statistics, JoinSnafu, OneShotRecvSnafu, UnableToConvertRowGroupMetadataToStatsSnafu, + statistics, + stream_reader::arrow_column_iters_to_table_iter, + JoinSnafu, OneShotRecvSnafu, UnableToConvertRowGroupMetadataToStatsSnafu, UnableToConvertSchemaToDaftSnafu, UnableToCreateParquetPageStreamSnafu, UnableToParseSchemaFromMetadataSnafu, UnableToRunExpressionOnStatsSnafu, }; @@ -298,6 +300,12 @@ pub(crate) struct ParquetFileReader { } impl ParquetFileReader { + const CHUNK_SIZE: usize = 2048; + // Set to a very high number 256MB to guard against unbounded large + // downloads from remote storage, which likely indicates corrupted Parquet data + // See: https://github.com/Eventual-Inc/Daft/issues/1551 + const MAX_HEADER_SIZE: usize = 256 * 1024 * 1024; + fn new( uri: String, metadata: parquet2::metadata::FileMetaData, @@ -371,9 +379,150 @@ impl ParquetFileReader { pub async fn read_from_ranges_into_table_stream( self, - _ranges: Arc, - ) -> BoxStream<'static, DaftResult> { - todo!("Implement streaming reads for remote parquet files") + ranges: Arc, + maintain_order: bool, + predicate: Option, + original_columns: Option>, + original_num_rows: Option, + ) -> DaftResult>> { + let daft_schema = Arc::new(daft_core::schema::Schema::try_from( + self.arrow_schema.as_ref(), + )?); + + let (senders, receivers): (Vec<_>, Vec<_>) = self + .row_ranges + .iter() + .map(|rg_range| { + let expected_num_chunks = + f32::ceil(rg_range.num_rows as f32 / Self::CHUNK_SIZE as f32) as usize; + crossbeam_channel::bounded(expected_num_chunks) + }) + .unzip(); + + let table_iter_handles = + self.row_ranges + .iter() + .zip(senders.into_iter()) + .map(|(row_range, sender)| { + let uri = self.uri.clone(); + let metadata = self.metadata.clone(); + let arrow_schema = self.arrow_schema.clone(); + let daft_schema = daft_schema.clone(); + let ranges = ranges.clone(); + let predicate = predicate.clone(); + let original_columns = original_columns.clone(); + let row_range = *row_range; + + tokio::task::spawn(async move { + let arr_iter_handles = arrow_schema.fields.iter().map(|field| { + let rt_handle = tokio::runtime::Handle::current(); + let ranges = ranges.clone(); + let uri = uri.clone(); + let field = field.clone(); + let metadata = metadata.clone(); + + tokio::task::spawn(async move { + let rg = metadata + .row_groups + .get(row_range.row_group_index) + .expect("Row Group index should be in bounds"); + let num_rows = + rg.num_rows().min(row_range.start + row_range.num_rows); + let filtered_columns = rg + .columns() + .iter() + .filter(|x| x.descriptor().path_in_schema[0] == field.name) + .collect::>(); + let mut decompressed_iters = + Vec::with_capacity(filtered_columns.len()); + let mut ptypes = Vec::with_capacity(filtered_columns.len()); + let mut num_values = Vec::with_capacity(filtered_columns.len()); + for col in filtered_columns.into_iter() { + num_values.push(col.metadata().num_values as usize); + ptypes.push(col.descriptor().descriptor.primitive_type.clone()); + + let byte_range = { + let (start, len) = col.byte_range(); + let end: u64 = start + len; + start as usize..end as usize + }; + let range_reader = + Box::pin(ranges.get_range_reader(byte_range).await?); + let compressed_page_stream = + get_owned_page_stream_from_column_start( + col, + range_reader, + vec![], + Arc::new(|_, _| true), + Self::MAX_HEADER_SIZE, + ) + .with_context( + |_| UnableToCreateParquetPageStreamSnafu:: { + path: uri.clone(), + }, + )?; + let page_stream = + streaming_decompression(compressed_page_stream); + let pinned_stream = Box::pin(page_stream); + decompressed_iters + .push(StreamIterator::new(pinned_stream, rt_handle.clone())) + } + let arr_iter = column_iter_to_arrays( + decompressed_iters, + ptypes.iter().collect(), + field, + Some(Self::CHUNK_SIZE), + num_rows, + num_values, + )?; + Ok(arr_iter) + }) + }); + + let arr_iters = try_join_all(arr_iter_handles) + .await + .context(JoinSnafu { path: uri.clone() })? + .into_iter() + .collect::>>()?; + + let table_iter = arrow_column_iters_to_table_iter( + arr_iters, + row_range.start, + daft_schema, + uri, + predicate, + original_columns, + original_num_rows, + ); + rayon::spawn(move || { + for table_result in table_iter { + let is_err = table_result.is_err(); + if let Err(crossbeam_channel::TrySendError::Full(_)) = + sender.try_send(table_result) + { + panic!("Parquet stream channel should not be full") + } + if is_err { + break; + } + } + }); + Ok(()) + }) + }); + + let _ = try_join_all(table_iter_handles) + .await + .context(JoinSnafu { path: self.uri })? + .into_iter() + .collect::>>()?; + + let combined_stream = + futures::stream::iter(receivers.into_iter().map(futures::stream::iter)); + match maintain_order { + true => Ok(Box::pin(combined_stream.flatten())), + false => Ok(Box::pin(combined_stream.flatten_unordered(None))), + } } pub async fn read_from_ranges_into_table( @@ -454,12 +603,8 @@ impl ParquetFileReader { range_reader, vec![], Arc::new(|_, _| true), - // Set to a very high number 256MB to guard against unbounded large - // downloads from remote storage, which likely indicates corrupted Parquet data - // See: https://github.com/Eventual-Inc/Daft/issues/1551 - 256 * 1024 * 1024, + Self::MAX_HEADER_SIZE, ) - .await .with_context(|_| { UnableToCreateParquetPageStreamSnafu:: { path: owned_uri.clone(), @@ -477,7 +622,7 @@ impl ParquetFileReader { decompressed_iters, ptypes.iter().collect(), field.clone(), - Some(2048), + Some(Self::CHUNK_SIZE), num_rows, num_values, ); @@ -641,12 +786,8 @@ impl ParquetFileReader { range_reader, vec![], Arc::new(|_, _| true), - // Set to a very high number 256MB to guard against unbounded large - // downloads from remote storage, which likely indicates corrupted Parquet data - // See: https://github.com/Eventual-Inc/Daft/issues/1551 - 256 * 1024 * 1024, + Self::MAX_HEADER_SIZE, ) - .await .with_context(|_| { UnableToCreateParquetPageStreamSnafu:: { path: owned_uri.clone(), diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 3332d073eb..1ddad000a8 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -410,8 +410,14 @@ async fn stream_parquet_single( Ok(( Arc::new(metadata), parquet_reader - .read_from_ranges_into_table_stream(ranges) - .await, + .read_from_ranges_into_table_stream( + ranges, + maintain_order, + predicate.clone(), + columns_to_return, + num_rows_to_return, + ) + .await?, )) }?; @@ -1014,11 +1020,15 @@ mod tests { use common_error::DaftResult; use daft_io::{IOClient, IOConfig}; + use futures::StreamExt; use super::read_parquet; + use super::stream_parquet; + + const PARQUET_FILE: &str = "s3://daft-public-data/test_fixtures/parquet-dev/mvp.parquet"; #[test] fn test_parquet_read_from_s3() -> DaftResult<()> { - let file = "s3://daft-public-data/test_fixtures/parquet-dev/mvp.parquet"; + let file = PARQUET_FILE; let mut io_config = IOConfig::default(); io_config.s3.anonymous = true; @@ -1043,4 +1053,39 @@ mod tests { Ok(()) } + + #[test] + fn test_parquet_streaming_read_from_s3() -> DaftResult<()> { + let file = PARQUET_FILE; + + let mut io_config = IOConfig::default(); + io_config.s3.anonymous = true; + + let io_client = Arc::new(IOClient::new(io_config.into())?); + let runtime_handle = daft_io::get_runtime(true)?; + runtime_handle.block_on(async move { + let tables = stream_parquet( + file, + None, + None, + None, + None, + None, + io_client, + None, + &Default::default(), + None, + None, + false, + ) + .await? + .collect::>() + .await + .into_iter() + .collect::>>()?; + let total_tables_len = tables.iter().map(|t| t.len()).sum::(); + assert_eq!(total_tables_len, 100); + Ok(()) + }) + } } diff --git a/src/daft-parquet/src/stream_reader.rs b/src/daft-parquet/src/stream_reader.rs index 3530c2d3e6..9939b27d61 100644 --- a/src/daft-parquet/src/stream_reader.rs +++ b/src/daft-parquet/src/stream_reader.rs @@ -48,6 +48,76 @@ fn prune_fields_from_schema( } } +pub(crate) fn arrow_column_iters_to_table_iter( + arr_iters: ArrowChunkIters, + row_range_start: usize, + schema_ref: SchemaRef, + uri: String, + predicate: Option, + original_columns: Option>, + original_num_rows: Option, +) -> impl Iterator> { + pub struct ParallelLockStepIter { + pub iters: ArrowChunkIters, + } + impl Iterator for ParallelLockStepIter { + type Item = arrow2::error::Result; + + fn next(&mut self) -> Option { + self.iters.par_iter_mut().map(|iter| iter.next()).collect() + } + } + let par_lock_step_iter = ParallelLockStepIter { iters: arr_iters }; + + // Keep track of the current index in the row group so we can throw away arrays that are not needed + // and slice arrays that are partially needed. + let mut index_so_far = 0; + let owned_schema_ref = schema_ref.clone(); + par_lock_step_iter.into_iter().map(move |chunk| { + let chunk = chunk.with_context(|_| { + super::UnableToCreateChunkFromStreamingFileReaderSnafu { path: uri.clone() } + })?; + let all_series = chunk + .into_iter() + .zip(owned_schema_ref.as_ref().fields.iter()) + .filter_map(|(mut arr, (f_name, _))| { + if (index_so_far + arr.len()) < row_range_start { + // No need to process arrays that are less than the start offset + return None; + } + if index_so_far < row_range_start { + // Slice arrays that are partially needed + let offset = row_range_start.saturating_sub(index_so_far); + arr = arr.sliced(offset, arr.len() - offset); + } + let series_result = + Series::try_from((f_name.as_str(), cast_array_for_daft_if_needed(arr))); + Some(series_result) + }) + .collect::>>()?; + + let len = all_series[0].len(); + if all_series.iter().any(|s| s.len() != len) { + return Err(super::Error::ParquetColumnsDontHaveEqualRows { path: uri.clone() }.into()); + } + index_so_far += len; + + let mut table = Table::new_with_size(owned_schema_ref.clone(), all_series, len) + .with_context(|_| super::UnableToCreateTableFromChunkSnafu { path: uri.clone() })?; + // Apply pushdowns if needed + if let Some(predicate) = &predicate { + table = table.filter(&[predicate.clone()])?; + if let Some(oc) = &original_columns { + table = table.get_columns(oc)?; + } + if let Some(nr) = original_num_rows { + table = table.head(nr)?; + } + } + Ok(table) + }) +} + #[allow(clippy::too_many_arguments)] pub(crate) fn local_parquet_read_into_column_iters( uri: &str, @@ -386,98 +456,40 @@ pub(crate) fn local_parquet_stream( }) .unzip(); - let uri = uri.to_string(); + let owned_uri = uri.to_string(); + let table_iters = + column_iters + .into_iter() + .zip(row_ranges) + .map(move |(rg_col_iter_result, rg_range)| { + let rg_col_iter = rg_col_iter_result?; + let table_iter = arrow_column_iters_to_table_iter( + rg_col_iter, + rg_range.start, + schema_ref.clone(), + owned_uri.clone(), + predicate.clone(), + original_columns.clone(), + original_num_rows, + ); + DaftResult::Ok(table_iter) + }); + rayon::spawn(move || { // Once a row group has been read into memory and we have the column iterators, // we can start processing them in parallel. - let par_column_iters = column_iters.zip(senders).zip(row_ranges).par_bridge(); + let par_table_iters = table_iters.zip(senders).par_bridge(); // For each vec of column iters, iterate through them in parallel lock step such that each iteration // produces a chunk of the row group that can be converted into a table. - par_column_iters.for_each(move |((rg_col_iter_result, tx), rg_range)| { - let rg_col_iter = match rg_col_iter_result { - Ok(iter) => iter, + par_table_iters.for_each(move |(table_iter_result, tx)| { + let table_iter = match table_iter_result { + Ok(t) => t, Err(e) => { - if let Err(crossbeam_channel::TrySendError::Full(_)) = - tx.try_send(Err(e.into())) - { - panic!("Parquet stream channel should not be full") - } + let _ = tx.send(Err(e)); return; } }; - let owned_schema_ref = schema_ref.clone(); - let owned_predicate = predicate.clone(); - let owned_original_columns = original_columns.clone(); - let owned_uri = uri.clone(); - - struct ParallelLockStepIter { - iters: ArrowChunkIters, - } - impl Iterator for ParallelLockStepIter { - type Item = arrow2::error::Result; - - fn next(&mut self) -> Option { - self.iters.par_iter_mut().map(|iter| iter.next()).collect() - } - } - let par_lock_step_iter = ParallelLockStepIter { iters: rg_col_iter }; - - // Keep track of the current index in the row group so we can throw away arrays that are not needed - // and slice arrays that are partially needed. - let mut index_so_far = 0; - let table_iter = par_lock_step_iter.into_iter().map(move |chunk| { - let chunk = chunk.with_context(|_| { - super::UnableToCreateChunkFromStreamingFileReaderSnafu { - path: owned_uri.clone(), - } - })?; - let all_series = chunk - .into_iter() - .zip(owned_schema_ref.fields.clone()) - .filter_map(|(mut arr, (f_name, _))| { - if (index_so_far + arr.len()) < rg_range.start { - // No need to process arrays that are less than the start offset - return None; - } - if index_so_far < rg_range.start { - // Slice arrays that are partially needed - let offset = rg_range.start.saturating_sub(index_so_far); - arr = arr.sliced(offset, arr.len() - offset); - } - let series_result = - Series::try_from((f_name.as_str(), cast_array_for_daft_if_needed(arr))); - Some(series_result) - }) - .collect::>>()?; - - let len = all_series[0].len(); - if all_series.iter().any(|s| s.len() != len) { - return Err(super::Error::ParquetColumnsDontHaveEqualRows { - path: owned_uri.clone(), - } - .into()); - } - index_so_far += len; - - let mut table = Table::new_with_size(owned_schema_ref.clone(), all_series, len) - .with_context(|_| super::UnableToCreateTableFromChunkSnafu { - path: owned_uri.clone(), - })?; - - // Apply pushdowns if needed - if let Some(predicate) = &owned_predicate { - table = table.filter(&[predicate.clone()])?; - if let Some(oc) = &owned_original_columns { - table = table.get_columns(oc)?; - } - if let Some(nr) = original_num_rows { - table = table.head(nr)?; - } - } - DaftResult::Ok(table) - }); - for table_result in table_iter { let table_err = table_result.is_err(); if let Err(crossbeam_channel::TrySendError::Full(_)) = tx.try_send(table_result) { diff --git a/src/parquet2/src/read/page/stream.rs b/src/parquet2/src/read/page/stream.rs index c5cd3ec245..25fb0fe6fc 100644 --- a/src/parquet2/src/read/page/stream.rs +++ b/src/parquet2/src/read/page/stream.rs @@ -51,7 +51,7 @@ pub async fn get_page_stream_from_column_start<'a, R: AsyncRead + Unpin + Send>( )) } -pub async fn get_owned_page_stream_from_column_start( +pub fn get_owned_page_stream_from_column_start( column_metadata: &ColumnChunkMetaData, reader: R, scratch: Vec,