Skip to content

Commit

Permalink
[FEAT] Remote parquet streaming (#2620)
Browse files Browse the repository at this point in the history
Adds streaming reads to remote parquet files.

The algorithm is similar to that for local parquet files: Read bytes
into memory -> get arrow chunk iterator -> emit table per chunk

Q6 Memory Profile:
<img width="1111" alt="Screenshot 2024-08-07 at 12 38 18 PM"
src="https://github.com/user-attachments/assets/f6d1f7d0-ede7-4fb6-9a13-7d07275c2158">
Streaming 
<img width="1100" alt="Screenshot 2024-08-07 at 12 38 41 PM"
src="https://github.com/user-attachments/assets/9c25380f-6cfc-4410-b551-02a0a053bac3">
Bulk

---------

Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
3 people authored Aug 9, 2024
1 parent 8cf6974 commit 48632c6
Show file tree
Hide file tree
Showing 4 changed files with 299 additions and 101 deletions.
171 changes: 156 additions & 15 deletions src/daft-parquet/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use crate::{
metadata::read_parquet_metadata,
read::ParquetSchemaInferenceOptions,
read_planner::{CoalescePass, RangesContainer, ReadPlanner, SplitLargeRequestPass},
statistics, JoinSnafu, OneShotRecvSnafu, UnableToConvertRowGroupMetadataToStatsSnafu,
statistics,
stream_reader::arrow_column_iters_to_table_iter,
JoinSnafu, OneShotRecvSnafu, UnableToConvertRowGroupMetadataToStatsSnafu,
UnableToConvertSchemaToDaftSnafu, UnableToCreateParquetPageStreamSnafu,
UnableToParseSchemaFromMetadataSnafu, UnableToRunExpressionOnStatsSnafu,
};
Expand Down Expand Up @@ -298,6 +300,12 @@ pub(crate) struct ParquetFileReader {
}

impl ParquetFileReader {
const CHUNK_SIZE: usize = 2048;
// Set to a very high number 256MB to guard against unbounded large
// downloads from remote storage, which likely indicates corrupted Parquet data
// See: https://github.com/Eventual-Inc/Daft/issues/1551
const MAX_HEADER_SIZE: usize = 256 * 1024 * 1024;

fn new(
uri: String,
metadata: parquet2::metadata::FileMetaData,
Expand Down Expand Up @@ -371,9 +379,150 @@ impl ParquetFileReader {

pub async fn read_from_ranges_into_table_stream(
self,
_ranges: Arc<RangesContainer>,
) -> BoxStream<'static, DaftResult<Table>> {
todo!("Implement streaming reads for remote parquet files")
ranges: Arc<RangesContainer>,
maintain_order: bool,
predicate: Option<ExprRef>,
original_columns: Option<Vec<String>>,
original_num_rows: Option<usize>,
) -> DaftResult<BoxStream<'static, DaftResult<Table>>> {
let daft_schema = Arc::new(daft_core::schema::Schema::try_from(
self.arrow_schema.as_ref(),
)?);

let (senders, receivers): (Vec<_>, Vec<_>) = self
.row_ranges
.iter()
.map(|rg_range| {
let expected_num_chunks =
f32::ceil(rg_range.num_rows as f32 / Self::CHUNK_SIZE as f32) as usize;
crossbeam_channel::bounded(expected_num_chunks)
})
.unzip();

let table_iter_handles =
self.row_ranges
.iter()
.zip(senders.into_iter())
.map(|(row_range, sender)| {
let uri = self.uri.clone();
let metadata = self.metadata.clone();
let arrow_schema = self.arrow_schema.clone();
let daft_schema = daft_schema.clone();
let ranges = ranges.clone();
let predicate = predicate.clone();
let original_columns = original_columns.clone();
let row_range = *row_range;

tokio::task::spawn(async move {
let arr_iter_handles = arrow_schema.fields.iter().map(|field| {
let rt_handle = tokio::runtime::Handle::current();
let ranges = ranges.clone();
let uri = uri.clone();
let field = field.clone();
let metadata = metadata.clone();

tokio::task::spawn(async move {
let rg = metadata
.row_groups
.get(row_range.row_group_index)
.expect("Row Group index should be in bounds");
let num_rows =
rg.num_rows().min(row_range.start + row_range.num_rows);
let filtered_columns = rg
.columns()
.iter()
.filter(|x| x.descriptor().path_in_schema[0] == field.name)
.collect::<Vec<_>>();
let mut decompressed_iters =
Vec::with_capacity(filtered_columns.len());
let mut ptypes = Vec::with_capacity(filtered_columns.len());
let mut num_values = Vec::with_capacity(filtered_columns.len());
for col in filtered_columns.into_iter() {
num_values.push(col.metadata().num_values as usize);
ptypes.push(col.descriptor().descriptor.primitive_type.clone());

let byte_range = {
let (start, len) = col.byte_range();
let end: u64 = start + len;
start as usize..end as usize
};
let range_reader =
Box::pin(ranges.get_range_reader(byte_range).await?);
let compressed_page_stream =
get_owned_page_stream_from_column_start(
col,
range_reader,
vec![],
Arc::new(|_, _| true),
Self::MAX_HEADER_SIZE,
)
.with_context(
|_| UnableToCreateParquetPageStreamSnafu::<String> {
path: uri.clone(),
},
)?;
let page_stream =
streaming_decompression(compressed_page_stream);
let pinned_stream = Box::pin(page_stream);
decompressed_iters
.push(StreamIterator::new(pinned_stream, rt_handle.clone()))
}
let arr_iter = column_iter_to_arrays(
decompressed_iters,
ptypes.iter().collect(),
field,
Some(Self::CHUNK_SIZE),
num_rows,
num_values,
)?;
Ok(arr_iter)
})
});

let arr_iters = try_join_all(arr_iter_handles)
.await
.context(JoinSnafu { path: uri.clone() })?
.into_iter()
.collect::<DaftResult<Vec<_>>>()?;

let table_iter = arrow_column_iters_to_table_iter(
arr_iters,
row_range.start,
daft_schema,
uri,
predicate,
original_columns,
original_num_rows,
);
rayon::spawn(move || {
for table_result in table_iter {
let is_err = table_result.is_err();
if let Err(crossbeam_channel::TrySendError::Full(_)) =
sender.try_send(table_result)
{
panic!("Parquet stream channel should not be full")
}
if is_err {
break;
}
}
});
Ok(())
})
});

let _ = try_join_all(table_iter_handles)
.await
.context(JoinSnafu { path: self.uri })?
.into_iter()
.collect::<DaftResult<Vec<_>>>()?;

let combined_stream =
futures::stream::iter(receivers.into_iter().map(futures::stream::iter));
match maintain_order {
true => Ok(Box::pin(combined_stream.flatten())),
false => Ok(Box::pin(combined_stream.flatten_unordered(None))),
}
}

pub async fn read_from_ranges_into_table(
Expand Down Expand Up @@ -454,12 +603,8 @@ impl ParquetFileReader {
range_reader,
vec![],
Arc::new(|_, _| true),
// Set to a very high number 256MB to guard against unbounded large
// downloads from remote storage, which likely indicates corrupted Parquet data
// See: https://github.com/Eventual-Inc/Daft/issues/1551
256 * 1024 * 1024,
Self::MAX_HEADER_SIZE,
)
.await
.with_context(|_| {
UnableToCreateParquetPageStreamSnafu::<String> {
path: owned_uri.clone(),
Expand All @@ -477,7 +622,7 @@ impl ParquetFileReader {
decompressed_iters,
ptypes.iter().collect(),
field.clone(),
Some(2048),
Some(Self::CHUNK_SIZE),
num_rows,
num_values,
);
Expand Down Expand Up @@ -641,12 +786,8 @@ impl ParquetFileReader {
range_reader,
vec![],
Arc::new(|_, _| true),
// Set to a very high number 256MB to guard against unbounded large
// downloads from remote storage, which likely indicates corrupted Parquet data
// See: https://github.com/Eventual-Inc/Daft/issues/1551
256 * 1024 * 1024,
Self::MAX_HEADER_SIZE,
)
.await
.with_context(|_| {
UnableToCreateParquetPageStreamSnafu::<String> {
path: owned_uri.clone(),
Expand Down
51 changes: 48 additions & 3 deletions src/daft-parquet/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,14 @@ async fn stream_parquet_single(
Ok((
Arc::new(metadata),
parquet_reader
.read_from_ranges_into_table_stream(ranges)
.await,
.read_from_ranges_into_table_stream(
ranges,
maintain_order,
predicate.clone(),
columns_to_return,
num_rows_to_return,
)
.await?,
))
}?;

Expand Down Expand Up @@ -1014,11 +1020,15 @@ mod tests {
use common_error::DaftResult;

use daft_io::{IOClient, IOConfig};
use futures::StreamExt;

use super::read_parquet;
use super::stream_parquet;

const PARQUET_FILE: &str = "s3://daft-public-data/test_fixtures/parquet-dev/mvp.parquet";
#[test]
fn test_parquet_read_from_s3() -> DaftResult<()> {
let file = "s3://daft-public-data/test_fixtures/parquet-dev/mvp.parquet";
let file = PARQUET_FILE;

let mut io_config = IOConfig::default();
io_config.s3.anonymous = true;
Expand All @@ -1043,4 +1053,39 @@ mod tests {

Ok(())
}

#[test]
fn test_parquet_streaming_read_from_s3() -> DaftResult<()> {
let file = PARQUET_FILE;

let mut io_config = IOConfig::default();
io_config.s3.anonymous = true;

let io_client = Arc::new(IOClient::new(io_config.into())?);
let runtime_handle = daft_io::get_runtime(true)?;
runtime_handle.block_on(async move {
let tables = stream_parquet(
file,
None,
None,
None,
None,
None,
io_client,
None,
&Default::default(),
None,
None,
false,
)
.await?
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<DaftResult<Vec<_>>>()?;
let total_tables_len = tables.iter().map(|t| t.len()).sum::<usize>();
assert_eq!(total_tables_len, 100);
Ok(())
})
}
}
Loading

0 comments on commit 48632c6

Please sign in to comment.