Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Colin Ho authored and Colin Ho committed Oct 11, 2024
1 parent d133fed commit 2c59e26
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 35 deletions.
29 changes: 13 additions & 16 deletions src/daft-csv/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,23 +310,20 @@ async fn read_csv_single_into_table(
.into_iter()
.collect::<DaftResult<Vec<_>>>()?;
// Handle empty table case.
let output_table = {
if collected_tables.is_empty() {
return Table::empty(Some(schema));
}
if collected_tables.is_empty() {
return Table::empty(Some(schema));
}

// // TODO(Clark): Don't concatenate all chunks from a file into a single table, since MicroPartition is natively chunked.
let concated_table = tables_concat(collected_tables)?;
if let Some(limit) = limit
&& concated_table.len() > limit
{
// apply head in case that last chunk went over limit
concated_table.head(limit)
} else {
Ok(concated_table)
}
}?;
Ok(output_table)
// // TODO(Clark): Don't concatenate all chunks from a file into a single table, since MicroPartition is natively chunked.
let concated_table = tables_concat(collected_tables)?;
if let Some(limit) = limit
&& concated_table.len() > limit
{
// apply head in case that last chunk went over limit
concated_table.head(limit)
} else {
Ok(concated_table)
}
}

async fn stream_csv_single(
Expand Down
32 changes: 14 additions & 18 deletions src/daft-json/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,24 +271,20 @@ async fn read_json_single_into_table(
.into_iter()
.collect::<DaftResult<Vec<_>>>()?;
// Handle empty table case.
let output_table = {
if collected_tables.is_empty() {
let daft_schema = Arc::new(Schema::try_from(&schema)?);
return Table::empty(Some(daft_schema));
}

// // TODO(Clark): Don't concatenate all chunks from a file into a single table, since MicroPartition is natively chunked.
let concated_table = tables_concat(collected_tables)?;
if let Some(limit) = limit
&& concated_table.len() > limit
{
// apply head in case that last chunk went over limit
concated_table.head(limit)
} else {
Ok(concated_table)
}
}?;
Ok(output_table)
if collected_tables.is_empty() {
let daft_schema = Arc::new(Schema::try_from(&schema)?);
return Table::empty(Some(daft_schema));
}
// // TODO(Clark): Don't concatenate all chunks from a file into a single table, since MicroPartition is natively chunked.
let concated_table = tables_concat(collected_tables)?;
if let Some(limit) = limit
&& concated_table.len() > limit
{
// apply head in case that last chunk went over limit
concated_table.head(limit)
} else {
Ok(concated_table)
}
}

pub async fn stream_json(
Expand Down
1 change: 0 additions & 1 deletion src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,6 @@ fn materialize_scan_task(
// If there is a partition spec and partition values aren't duplicated in the data, inline the partition values
// into the table when casting the schema.
let fill_map = scan_task.partition_spec().map(|pspec| pspec.to_fill_map());
println!("fill_map: {:?}", fill_map);
table_values = table_values
.iter()
.map(|tbl| tbl.cast_to_schema_with_fill(cast_to_schema.as_ref(), fill_map.as_ref()))
Expand Down

0 comments on commit 2c59e26

Please sign in to comment.