Skip to content

Commit

Permalink
include in partition spec
Browse files Browse the repository at this point in the history
  • Loading branch information
Colin Ho authored and Colin Ho committed Oct 11, 2024
1 parent 945cfc7 commit d133fed
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 104 deletions.
24 changes: 1 addition & 23 deletions src/daft-csv/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ pub fn read_csv(
io_client,
io_stats,
max_chunks_in_flight,
None,
)
.await
})
Expand All @@ -80,28 +79,18 @@ pub fn read_csv_bulk(
multithreaded_io: bool,
max_chunks_in_flight: Option<usize>,
num_parallel_tasks: usize,
file_path_column: Option<&str>,
) -> DaftResult<Vec<Table>> {
let runtime_handle = get_runtime(multithreaded_io)?;
let tables = runtime_handle.block_on_current_thread(async move {
// Launch a read task per URI, throttling the number of concurrent file reads to num_parallel tasks.
let task_stream = futures::stream::iter(uris.iter().map(|uri| {
let (
uri,
convert_options,
parse_options,
read_options,
io_client,
io_stats,
file_path_column,
) = (
let (uri, convert_options, parse_options, read_options, io_client, io_stats) = (
uri.to_string(),
convert_options.clone(),
parse_options.clone(),
read_options.clone(),
io_client.clone(),
io_stats.clone(),
file_path_column.map(|s| s.to_string()),
);
tokio::task::spawn(async move {
read_csv_single_into_table(
Expand All @@ -112,7 +101,6 @@ pub fn read_csv_bulk(
io_client,
io_stats,
max_chunks_in_flight,
file_path_column.as_deref(),
)
.await
})
Expand Down Expand Up @@ -220,7 +208,6 @@ async fn read_csv_single_into_table(
io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
max_chunks_in_flight: Option<usize>,
file_path_column: Option<&str>,
) -> DaftResult<Table> {
let predicate = convert_options
.as_ref()
Expand Down Expand Up @@ -339,15 +326,6 @@ async fn read_csv_single_into_table(
Ok(concated_table)
}
}?;
if let Some(file_path_col_name) = file_path_column {
let trimmed = uri.trim_start_matches("file://");
let file_paths_column = Utf8Array::from_iter(
file_path_col_name,
std::iter::repeat(Some(trimmed)).take(output_table.len()),
)
.into_series();
return output_table.union(&Table::from_nonempty_columns(vec![file_paths_column])?);
}
Ok(output_table)
}

Expand Down
13 changes: 1 addition & 12 deletions src/daft-json/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ pub fn read_json_local(
parse_options: Option<JsonParseOptions>,
read_options: Option<JsonReadOptions>,
max_chunks_in_flight: Option<usize>,
file_path_column: Option<&str>,
) -> DaftResult<Table> {
let uri = uri.trim_start_matches("file://");
let file = std::fs::File::open(uri)?;
Expand All @@ -44,17 +43,7 @@ pub fn read_json_local(
read_options,
max_chunks_in_flight,
)?;
let output_table = reader.finish()?;
if let Some(file_path_col_name) = file_path_column {
let trimmed = uri.trim_start_matches("file://");
let file_paths_column = Utf8Array::from_iter(
file_path_col_name,
std::iter::repeat(Some(trimmed)).take(output_table.len()),
)
.into_series();
return output_table.union(&Table::from_nonempty_columns(vec![file_paths_column])?);
}
Ok(output_table)
reader.finish()
}

struct JsonReader<'a> {
Expand Down
25 changes: 1 addition & 24 deletions src/daft-json/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ pub fn read_json(
io_client,
io_stats,
max_chunks_in_flight,
None,
)
.await
})
Expand All @@ -73,28 +72,18 @@ pub fn read_json_bulk(
multithreaded_io: bool,
max_chunks_in_flight: Option<usize>,
num_parallel_tasks: usize,
file_path_column: Option<&str>,
) -> DaftResult<Vec<Table>> {
let runtime_handle = get_runtime(multithreaded_io)?;
let tables = runtime_handle.block_on_current_thread(async move {
// Launch a read task per URI, throttling the number of concurrent file reads to num_parallel tasks.
let task_stream = futures::stream::iter(uris.iter().map(|uri| {
let (
uri,
convert_options,
parse_options,
read_options,
io_client,
io_stats,
file_path_column,
) = (
let (uri, convert_options, parse_options, read_options, io_client, io_stats) = (
uri.to_string(),
convert_options.clone(),
parse_options.clone(),
read_options.clone(),
io_client.clone(),
io_stats.clone(),
file_path_column.map(|s| s.to_string()),
);
tokio::task::spawn(async move {
let table = read_json_single_into_table(
Expand All @@ -105,7 +94,6 @@ pub fn read_json_bulk(
io_client,
io_stats,
max_chunks_in_flight,
file_path_column.as_deref(),
)
.await?;
DaftResult::Ok(table)
Expand Down Expand Up @@ -189,7 +177,6 @@ async fn read_json_single_into_table(
io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
max_chunks_in_flight: Option<usize>,
file_path_column: Option<&str>,
) -> DaftResult<Table> {
let (source_type, fixed_uri) = parse_url(uri)?;
let is_compressed = CompressionCodec::from_uri(uri).is_some();
Expand All @@ -200,7 +187,6 @@ async fn read_json_single_into_table(
parse_options,
read_options,
max_chunks_in_flight,
file_path_column,
);
}

Expand Down Expand Up @@ -302,15 +288,6 @@ async fn read_json_single_into_table(
Ok(concated_table)
}
}?;
if let Some(file_path_col_name) = file_path_column {
let trimmed = uri.trim_start_matches("file://");
let file_paths_column = Utf8Array::from_iter(
file_path_col_name,
std::iter::repeat(Some(trimmed)).take(output_table.len()),
)
.into_series();
return output_table.union(&Table::from_nonempty_columns(vec![file_paths_column])?);
}
Ok(output_table)
}

Expand Down
18 changes: 1 addition & 17 deletions src/daft-local-execution/src/sources/scan_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ use std::sync::Arc;

use common_error::DaftResult;
use common_file_formats::{FileFormatConfig, ParquetSourceConfig};
use daft_core::{prelude::Utf8Array, series::IntoSeries};
use daft_csv::{CsvConvertOptions, CsvParseOptions, CsvReadOptions};
use daft_io::IOStatsRef;
use daft_json::{JsonConvertOptions, JsonParseOptions, JsonReadOptions};
use daft_micropartition::MicroPartition;
use daft_parquet::read::ParquetSchemaInferenceOptions;
use daft_scan::{storage_config::StorageConfig, ChunkSpec, ScanTask};
use daft_table::Table;
use futures::{Stream, StreamExt};
use tokio_stream::wrappers::ReceiverStream;
use tracing::instrument;
Expand Down Expand Up @@ -277,22 +275,8 @@ async fn stream_scan_task(
}
};

let url = if scan_task.file_path_column.is_some() {
Some(url.to_string())
} else {
None
};
Ok(table_stream.map(move |table| {
let mut table = table?;
if let Some(file_path_col_name) = scan_task.file_path_column.as_ref() {
let trimmed = url.as_ref().unwrap().trim_start_matches("file://");
let file_paths_column = Utf8Array::from_iter(
file_path_col_name,
std::iter::repeat(Some(trimmed)).take(table.len()),
)
.into_series();
table = table.union(&Table::from_nonempty_columns(vec![file_paths_column])?)?;
}
let table = table?;
let casted_table = table.cast_to_schema_with_fill(
scan_task.materialized_schema().as_ref(),
scan_task
Expand Down
12 changes: 1 addition & 11 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ fn materialize_scan_task(
metadatas,
Some(delete_map),
*chunk_size,
scan_task.file_path_column.as_deref(),
)
.context(DaftCoreComputeSnafu)?
}
Expand Down Expand Up @@ -236,7 +235,6 @@ fn materialize_scan_task(
native_storage_config.multithreaded_io,
None,
8,
scan_task.file_path_column.as_deref(),
)
.context(DaftCoreComputeSnafu)?
}
Expand Down Expand Up @@ -267,7 +265,6 @@ fn materialize_scan_task(
native_storage_config.multithreaded_io,
None,
8,
scan_task.file_path_column.as_deref(),
)
.context(DaftCoreComputeSnafu)?
}
Expand Down Expand Up @@ -498,7 +495,7 @@ fn materialize_scan_task(
// If there is a partition spec and partition values aren't duplicated in the data, inline the partition values
// into the table when casting the schema.
let fill_map = scan_task.partition_spec().map(|pspec| pspec.to_fill_map());

println!("fill_map: {:?}", fill_map);
table_values = table_values
.iter()
.map(|tbl| tbl.cast_to_schema_with_fill(cast_to_schema.as_ref(), fill_map.as_ref()))
Expand Down Expand Up @@ -866,7 +863,6 @@ pub(crate) fn read_csv_into_micropartition(
multithreaded_io,
None,
8,
None,
)
.context(DaftCoreComputeSnafu)?;

Expand Down Expand Up @@ -916,7 +912,6 @@ pub(crate) fn read_json_into_micropartition(
multithreaded_io,
None,
8,
None,
)
.context(DaftCoreComputeSnafu)?;

Expand Down Expand Up @@ -993,7 +988,6 @@ fn _read_delete_files(
None,
None,
None,
None,
)?;

let mut delete_map: HashMap<String, Vec<i64>> =
Expand Down Expand Up @@ -1039,7 +1033,6 @@ fn _read_parquet_into_loaded_micropartition<T: AsRef<str>>(
catalog_provided_schema: Option<SchemaRef>,
field_id_mapping: Option<Arc<BTreeMap<i32, Field>>>,
chunk_size: Option<usize>,
file_path_column: Option<&str>,
) -> DaftResult<MicroPartition> {
let delete_map = iceberg_delete_files
.map(|files| {
Expand Down Expand Up @@ -1074,7 +1067,6 @@ fn _read_parquet_into_loaded_micropartition<T: AsRef<str>>(
None,
delete_map,
chunk_size,
file_path_column,
)?;

// Prefer using the `catalog_provided_schema` but fall back onto inferred schema from Parquet files
Expand Down Expand Up @@ -1162,7 +1154,6 @@ pub(crate) fn read_parquet_into_micropartition<T: AsRef<str>>(
catalog_provided_schema,
field_id_mapping,
chunk_size,
file_path_column,
);
}
let runtime_handle = get_runtime(multithreaded_io)?;
Expand Down Expand Up @@ -1346,7 +1337,6 @@ pub(crate) fn read_parquet_into_micropartition<T: AsRef<str>>(
catalog_provided_schema,
field_id_mapping,
chunk_size,
file_path_column,
)
}
}
Expand Down
1 change: 0 additions & 1 deletion src/daft-parquet/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ pub mod pylib {
None,
None,
None,
None,
)?
.into_iter()
.map(|v| v.into())
Expand Down
15 changes: 0 additions & 15 deletions src/daft-parquet/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ async fn read_parquet_single(
metadata: Option<Arc<FileMetaData>>,
delete_rows: Option<Vec<i64>>,
chunk_size: Option<usize>,
file_path_column: Option<&str>,
) -> DaftResult<Table> {
let field_id_mapping_provided = field_id_mapping.is_some();
let mut columns_to_read = columns.clone();
Expand Down Expand Up @@ -357,16 +356,6 @@ async fn read_parquet_single(
.into());
}

if let Some(file_path_col_name) = file_path_column {
let trimmed = uri.trim_start_matches("file://");
let file_paths_column = Utf8Array::from_iter(
file_path_col_name,
std::iter::repeat(Some(trimmed)).take(table.len()),
)
.into_series();
return table.union(&Table::from_nonempty_columns(vec![file_paths_column])?);
}

Ok(table)
}

Expand Down Expand Up @@ -694,7 +683,6 @@ pub fn read_parquet(
metadata,
None,
None,
None,
)
.await
})
Expand Down Expand Up @@ -763,7 +751,6 @@ pub fn read_parquet_bulk<T: AsRef<str>>(
metadata: Option<Vec<Arc<FileMetaData>>>,
delete_map: Option<HashMap<String, Vec<i64>>>,
chunk_size: Option<usize>,
file_path_column: Option<&str>,
) -> DaftResult<Vec<Table>> {
let runtime_handle = daft_io::get_runtime(multithreaded_io)?;

Expand Down Expand Up @@ -791,7 +778,6 @@ pub fn read_parquet_bulk<T: AsRef<str>>(
let schema_infer_options = *schema_infer_options;
let owned_field_id_mapping = field_id_mapping.clone();
let delete_rows = delete_map.as_ref().and_then(|m| m.get(&uri).cloned());
let owned_file_path_column = file_path_column.map(|s| s.to_string());
tokio::task::spawn(async move {
read_parquet_single(
&uri,
Expand All @@ -807,7 +793,6 @@ pub fn read_parquet_bulk<T: AsRef<str>>(
metadata,
delete_rows,
chunk_size,
owned_file_path_column.as_deref(),
)
.await
})
Expand Down
Loading

0 comments on commit d133fed

Please sign in to comment.