From d133fed79cb80a8f719943dbf36026748ab2f7f3 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Fri, 11 Oct 2024 10:10:42 -0700 Subject: [PATCH] include in partition spec --- src/daft-csv/src/read.rs | 24 +----------------- src/daft-json/src/local.rs | 13 +--------- src/daft-json/src/read.rs | 25 +------------------ .../src/sources/scan_task.rs | 18 +------------ src/daft-micropartition/src/micropartition.rs | 12 +-------- src/daft-parquet/src/python.rs | 1 - src/daft-parquet/src/read.rs | 15 ----------- src/daft-scan/src/glob.rs | 15 ++++++++++- src/daft-table/src/lib.rs | 2 ++ 9 files changed, 21 insertions(+), 104 deletions(-) diff --git a/src/daft-csv/src/read.rs b/src/daft-csv/src/read.rs index 60e9a50e36..5216d29f39 100644 --- a/src/daft-csv/src/read.rs +++ b/src/daft-csv/src/read.rs @@ -63,7 +63,6 @@ pub fn read_csv( io_client, io_stats, max_chunks_in_flight, - None, ) .await }) @@ -80,28 +79,18 @@ pub fn read_csv_bulk( multithreaded_io: bool, max_chunks_in_flight: Option, num_parallel_tasks: usize, - file_path_column: Option<&str>, ) -> DaftResult> { let runtime_handle = get_runtime(multithreaded_io)?; let tables = runtime_handle.block_on_current_thread(async move { // Launch a read task per URI, throttling the number of concurrent file reads to num_parallel tasks. let task_stream = futures::stream::iter(uris.iter().map(|uri| { - let ( - uri, - convert_options, - parse_options, - read_options, - io_client, - io_stats, - file_path_column, - ) = ( + let (uri, convert_options, parse_options, read_options, io_client, io_stats) = ( uri.to_string(), convert_options.clone(), parse_options.clone(), read_options.clone(), io_client.clone(), io_stats.clone(), - file_path_column.map(|s| s.to_string()), ); tokio::task::spawn(async move { read_csv_single_into_table( @@ -112,7 +101,6 @@ pub fn read_csv_bulk( io_client, io_stats, max_chunks_in_flight, - file_path_column.as_deref(), ) .await }) @@ -220,7 +208,6 @@ async fn read_csv_single_into_table( io_client: Arc, io_stats: Option, max_chunks_in_flight: Option, - file_path_column: Option<&str>, ) -> DaftResult { let predicate = convert_options .as_ref() @@ -339,15 +326,6 @@ async fn read_csv_single_into_table( Ok(concated_table) } }?; - if let Some(file_path_col_name) = file_path_column { - let trimmed = uri.trim_start_matches("file://"); - let file_paths_column = Utf8Array::from_iter( - file_path_col_name, - std::iter::repeat(Some(trimmed)).take(output_table.len()), - ) - .into_series(); - return output_table.union(&Table::from_nonempty_columns(vec![file_paths_column])?); - } Ok(output_table) } diff --git a/src/daft-json/src/local.rs b/src/daft-json/src/local.rs index 61562e764c..224c94f24f 100644 --- a/src/daft-json/src/local.rs +++ b/src/daft-json/src/local.rs @@ -28,7 +28,6 @@ pub fn read_json_local( parse_options: Option, read_options: Option, max_chunks_in_flight: Option, - file_path_column: Option<&str>, ) -> DaftResult
{ let uri = uri.trim_start_matches("file://"); let file = std::fs::File::open(uri)?; @@ -44,17 +43,7 @@ pub fn read_json_local( read_options, max_chunks_in_flight, )?; - let output_table = reader.finish()?; - if let Some(file_path_col_name) = file_path_column { - let trimmed = uri.trim_start_matches("file://"); - let file_paths_column = Utf8Array::from_iter( - file_path_col_name, - std::iter::repeat(Some(trimmed)).take(output_table.len()), - ) - .into_series(); - return output_table.union(&Table::from_nonempty_columns(vec![file_paths_column])?); - } - Ok(output_table) + reader.finish() } struct JsonReader<'a> { diff --git a/src/daft-json/src/read.rs b/src/daft-json/src/read.rs index 007f530109..de7b153b90 100644 --- a/src/daft-json/src/read.rs +++ b/src/daft-json/src/read.rs @@ -56,7 +56,6 @@ pub fn read_json( io_client, io_stats, max_chunks_in_flight, - None, ) .await }) @@ -73,28 +72,18 @@ pub fn read_json_bulk( multithreaded_io: bool, max_chunks_in_flight: Option, num_parallel_tasks: usize, - file_path_column: Option<&str>, ) -> DaftResult> { let runtime_handle = get_runtime(multithreaded_io)?; let tables = runtime_handle.block_on_current_thread(async move { // Launch a read task per URI, throttling the number of concurrent file reads to num_parallel tasks. let task_stream = futures::stream::iter(uris.iter().map(|uri| { - let ( - uri, - convert_options, - parse_options, - read_options, - io_client, - io_stats, - file_path_column, - ) = ( + let (uri, convert_options, parse_options, read_options, io_client, io_stats) = ( uri.to_string(), convert_options.clone(), parse_options.clone(), read_options.clone(), io_client.clone(), io_stats.clone(), - file_path_column.map(|s| s.to_string()), ); tokio::task::spawn(async move { let table = read_json_single_into_table( @@ -105,7 +94,6 @@ pub fn read_json_bulk( io_client, io_stats, max_chunks_in_flight, - file_path_column.as_deref(), ) .await?; DaftResult::Ok(table) @@ -189,7 +177,6 @@ async fn read_json_single_into_table( io_client: Arc, io_stats: Option, max_chunks_in_flight: Option, - file_path_column: Option<&str>, ) -> DaftResult
{ let (source_type, fixed_uri) = parse_url(uri)?; let is_compressed = CompressionCodec::from_uri(uri).is_some(); @@ -200,7 +187,6 @@ async fn read_json_single_into_table( parse_options, read_options, max_chunks_in_flight, - file_path_column, ); } @@ -302,15 +288,6 @@ async fn read_json_single_into_table( Ok(concated_table) } }?; - if let Some(file_path_col_name) = file_path_column { - let trimmed = uri.trim_start_matches("file://"); - let file_paths_column = Utf8Array::from_iter( - file_path_col_name, - std::iter::repeat(Some(trimmed)).take(output_table.len()), - ) - .into_series(); - return output_table.union(&Table::from_nonempty_columns(vec![file_paths_column])?); - } Ok(output_table) } diff --git a/src/daft-local-execution/src/sources/scan_task.rs b/src/daft-local-execution/src/sources/scan_task.rs index 51101ce56d..5b9f95d96e 100644 --- a/src/daft-local-execution/src/sources/scan_task.rs +++ b/src/daft-local-execution/src/sources/scan_task.rs @@ -2,14 +2,12 @@ use std::sync::Arc; use common_error::DaftResult; use common_file_formats::{FileFormatConfig, ParquetSourceConfig}; -use daft_core::{prelude::Utf8Array, series::IntoSeries}; use daft_csv::{CsvConvertOptions, CsvParseOptions, CsvReadOptions}; use daft_io::IOStatsRef; use daft_json::{JsonConvertOptions, JsonParseOptions, JsonReadOptions}; use daft_micropartition::MicroPartition; use daft_parquet::read::ParquetSchemaInferenceOptions; use daft_scan::{storage_config::StorageConfig, ChunkSpec, ScanTask}; -use daft_table::Table; use futures::{Stream, StreamExt}; use tokio_stream::wrappers::ReceiverStream; use tracing::instrument; @@ -277,22 +275,8 @@ async fn stream_scan_task( } }; - let url = if scan_task.file_path_column.is_some() { - Some(url.to_string()) - } else { - None - }; Ok(table_stream.map(move |table| { - let mut table = table?; - if let Some(file_path_col_name) = scan_task.file_path_column.as_ref() { - let trimmed = url.as_ref().unwrap().trim_start_matches("file://"); - let file_paths_column = Utf8Array::from_iter( - file_path_col_name, - std::iter::repeat(Some(trimmed)).take(table.len()), - ) - .into_series(); - table = table.union(&Table::from_nonempty_columns(vec![file_paths_column])?)?; - } + let table = table?; let casted_table = table.cast_to_schema_with_fill( scan_task.materialized_schema().as_ref(), scan_task diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index 8ab12f7f5a..68224d85cf 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -181,7 +181,6 @@ fn materialize_scan_task( metadatas, Some(delete_map), *chunk_size, - scan_task.file_path_column.as_deref(), ) .context(DaftCoreComputeSnafu)? } @@ -236,7 +235,6 @@ fn materialize_scan_task( native_storage_config.multithreaded_io, None, 8, - scan_task.file_path_column.as_deref(), ) .context(DaftCoreComputeSnafu)? } @@ -267,7 +265,6 @@ fn materialize_scan_task( native_storage_config.multithreaded_io, None, 8, - scan_task.file_path_column.as_deref(), ) .context(DaftCoreComputeSnafu)? } @@ -498,7 +495,7 @@ fn materialize_scan_task( // If there is a partition spec and partition values aren't duplicated in the data, inline the partition values // into the table when casting the schema. let fill_map = scan_task.partition_spec().map(|pspec| pspec.to_fill_map()); - + println!("fill_map: {:?}", fill_map); table_values = table_values .iter() .map(|tbl| tbl.cast_to_schema_with_fill(cast_to_schema.as_ref(), fill_map.as_ref())) @@ -866,7 +863,6 @@ pub(crate) fn read_csv_into_micropartition( multithreaded_io, None, 8, - None, ) .context(DaftCoreComputeSnafu)?; @@ -916,7 +912,6 @@ pub(crate) fn read_json_into_micropartition( multithreaded_io, None, 8, - None, ) .context(DaftCoreComputeSnafu)?; @@ -993,7 +988,6 @@ fn _read_delete_files( None, None, None, - None, )?; let mut delete_map: HashMap> = @@ -1039,7 +1033,6 @@ fn _read_parquet_into_loaded_micropartition>( catalog_provided_schema: Option, field_id_mapping: Option>>, chunk_size: Option, - file_path_column: Option<&str>, ) -> DaftResult { let delete_map = iceberg_delete_files .map(|files| { @@ -1074,7 +1067,6 @@ fn _read_parquet_into_loaded_micropartition>( None, delete_map, chunk_size, - file_path_column, )?; // Prefer using the `catalog_provided_schema` but fall back onto inferred schema from Parquet files @@ -1162,7 +1154,6 @@ pub(crate) fn read_parquet_into_micropartition>( catalog_provided_schema, field_id_mapping, chunk_size, - file_path_column, ); } let runtime_handle = get_runtime(multithreaded_io)?; @@ -1346,7 +1337,6 @@ pub(crate) fn read_parquet_into_micropartition>( catalog_provided_schema, field_id_mapping, chunk_size, - file_path_column, ) } } diff --git a/src/daft-parquet/src/python.rs b/src/daft-parquet/src/python.rs index d9616b0978..2d965053c2 100644 --- a/src/daft-parquet/src/python.rs +++ b/src/daft-parquet/src/python.rs @@ -170,7 +170,6 @@ pub mod pylib { None, None, None, - None, )? .into_iter() .map(|v| v.into()) diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index e3eac7f627..3b6c498cf6 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -156,7 +156,6 @@ async fn read_parquet_single( metadata: Option>, delete_rows: Option>, chunk_size: Option, - file_path_column: Option<&str>, ) -> DaftResult
{ let field_id_mapping_provided = field_id_mapping.is_some(); let mut columns_to_read = columns.clone(); @@ -357,16 +356,6 @@ async fn read_parquet_single( .into()); } - if let Some(file_path_col_name) = file_path_column { - let trimmed = uri.trim_start_matches("file://"); - let file_paths_column = Utf8Array::from_iter( - file_path_col_name, - std::iter::repeat(Some(trimmed)).take(table.len()), - ) - .into_series(); - return table.union(&Table::from_nonempty_columns(vec![file_paths_column])?); - } - Ok(table) } @@ -694,7 +683,6 @@ pub fn read_parquet( metadata, None, None, - None, ) .await }) @@ -763,7 +751,6 @@ pub fn read_parquet_bulk>( metadata: Option>>, delete_map: Option>>, chunk_size: Option, - file_path_column: Option<&str>, ) -> DaftResult> { let runtime_handle = daft_io::get_runtime(multithreaded_io)?; @@ -791,7 +778,6 @@ pub fn read_parquet_bulk>( let schema_infer_options = *schema_infer_options; let owned_field_id_mapping = field_id_mapping.clone(); let delete_rows = delete_map.as_ref().and_then(|m| m.get(&uri).cloned()); - let owned_file_path_column = file_path_column.map(|s| s.to_string()); tokio::task::spawn(async move { read_parquet_single( &uri, @@ -807,7 +793,6 @@ pub fn read_parquet_bulk>( metadata, delete_rows, chunk_size, - owned_file_path_column.as_deref(), ) .await }) diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index 56f5cbb5c5..28c1ca1686 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -2,10 +2,13 @@ use std::{sync::Arc, vec}; use common_error::{DaftError, DaftResult}; use common_file_formats::{CsvSourceConfig, FileFormat, FileFormatConfig, ParquetSourceConfig}; +use daft_core::{prelude::Utf8Array, series::IntoSeries}; use daft_csv::CsvParseOptions; use daft_io::{parse_url, FileMetadata, IOClient, IOStatsContext, IOStatsRef, RuntimeRef}; use daft_parquet::read::ParquetSchemaInferenceOptions; use daft_schema::schema::SchemaRef; +use daft_stats::PartitionSpec; +use daft_table::Table; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use snafu::Snafu; @@ -346,6 +349,16 @@ impl ScanOperator for GlobScanOperator { size: size_bytes, .. } = f?; + let partition_spec = if let Some(fp_col) = &file_path_column { + let trimmed = path.trim_start_matches("file://"); + let file_paths_column_series = + Utf8Array::from_iter(fp_col, std::iter::once(Some(trimmed))).into_series(); + Some(PartitionSpec { + keys: Table::from_nonempty_columns(vec![file_paths_column_series])?, + }) + } else { + None + }; let row_group = row_groups .as_ref() .and_then(|rgs| rgs.get(idx).cloned()) @@ -358,7 +371,7 @@ impl ScanOperator for GlobScanOperator { size_bytes, iceberg_delete_files: None, metadata: None, - partition_spec: None, + partition_spec, statistics: None, parquet_metadata: None, }], diff --git a/src/daft-table/src/lib.rs b/src/daft-table/src/lib.rs index 3669fda3f5..ff9f9c0857 100644 --- a/src/daft-table/src/lib.rs +++ b/src/daft-table/src/lib.rs @@ -647,7 +647,9 @@ impl Table { schema: &Schema, fill_map: Option<&HashMap<&str, ExprRef>>, ) -> DaftResult { + println!("schema: {:?}", schema); let current_col_names = HashSet::<_>::from_iter(self.column_names()); + println!("current_col_names: {:?}", current_col_names); let null_lit = null_lit(); let exprs: Vec<_> = schema .fields