Skip to content

Commit

Permalink
Merge branch main into colin/swordfish-iceberg-mor
Browse files Browse the repository at this point in the history
  • Loading branch information
Colin Ho authored and Colin Ho committed Oct 12, 2024
2 parents 33d6d31 + c694c9e commit 252a330
Show file tree
Hide file tree
Showing 23 changed files with 505 additions and 57 deletions.
1 change: 1 addition & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,7 @@ class ScanOperatorHandle:
storage_config: StorageConfig,
infer_schema: bool,
schema: PySchema | None = None,
file_path_column: str | None = None,
) -> ScanOperatorHandle: ...
@staticmethod
def from_python_scan_operator(operator: ScanOperator) -> ScanOperatorHandle: ...
Expand Down
3 changes: 3 additions & 0 deletions daft/io/_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def read_csv(
comment: Optional[str] = None,
allow_variable_columns: bool = False,
io_config: Optional["IOConfig"] = None,
file_path_column: Optional[str] = None,
use_native_downloader: bool = True,
schema_hints: Optional[Dict[str, DataType]] = None,
_buffer_size: Optional[int] = None,
Expand All @@ -54,6 +55,7 @@ def read_csv(
comment (str): Character to treat as the start of a comment line, or None to not support comments
allow_variable_columns (bool): Whether to allow for variable number of columns in the CSV, defaults to False. If set to True, Daft will append nulls to rows with less columns than the schema, and ignore extra columns in rows with more columns
io_config (IOConfig): Config to be used with the native downloader
file_path_column: Include the source path(s) as a column with this name. Defaults to None.
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This
is currently experimental.
Expand Down Expand Up @@ -97,5 +99,6 @@ def read_csv(
schema=schema,
file_format_config=file_format_config,
storage_config=storage_config,
file_path_column=file_path_column,
)
return DataFrame(builder)
3 changes: 3 additions & 0 deletions daft/io/_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def read_json(
infer_schema: bool = True,
schema: Optional[Dict[str, DataType]] = None,
io_config: Optional["IOConfig"] = None,
file_path_column: Optional[str] = None,
use_native_downloader: bool = True,
schema_hints: Optional[Dict[str, DataType]] = None,
_buffer_size: Optional[int] = None,
Expand All @@ -41,6 +42,7 @@ def read_json(
infer_schema (bool): Whether to infer the schema of the JSON, defaults to True.
schema (dict[str, DataType]): A schema that is used as the definitive schema for the JSON if infer_schema is False, otherwise it is used as a schema hint that is applied after the schema is inferred.
io_config (IOConfig): Config to be used with the native downloader
file_path_column: Include the source path(s) as a column with this name. Defaults to None.
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This
is currently experimental.
Expand Down Expand Up @@ -74,5 +76,6 @@ def read_json(
schema=schema,
file_format_config=file_format_config,
storage_config=storage_config,
file_path_column=file_path_column,
)
return DataFrame(builder)
3 changes: 3 additions & 0 deletions daft/io/_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def read_parquet(
infer_schema: bool = True,
schema: Optional[Dict[str, DataType]] = None,
io_config: Optional["IOConfig"] = None,
file_path_column: Optional[str] = None,
use_native_downloader: bool = True,
coerce_int96_timestamp_unit: Optional[Union[str, TimeUnit]] = None,
schema_hints: Optional[Dict[str, DataType]] = None,
Expand All @@ -45,6 +46,7 @@ def read_parquet(
infer_schema (bool): Whether to infer the schema of the Parquet, defaults to True.
schema (dict[str, DataType]): A schema that is used as the definitive schema for the Parquet file if infer_schema is False, otherwise it is used as a schema hint that is applied after the schema is inferred.
io_config (IOConfig): Config to be used with the native downloader
file_path_column: Include the source path(s) as a column with this name. Defaults to None.
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet.
coerce_int96_timestamp_unit: TimeUnit to coerce Int96 TimeStamps to. e.g.: [ns, us, ms], Defaults to None.
_multithreaded_io: Whether to use multithreading for IO threads. Setting this to False can be helpful in reducing
Expand Down Expand Up @@ -93,5 +95,6 @@ def read_parquet(
schema=schema,
file_format_config=file_format_config,
storage_config=storage_config,
file_path_column=file_path_column,
)
return DataFrame(builder)
2 changes: 2 additions & 0 deletions daft/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def get_tabular_files_scan(
schema: dict[str, DataType] | None,
file_format_config: FileFormatConfig,
storage_config: StorageConfig,
file_path_column: str | None = None,
) -> LogicalPlanBuilder:
"""Returns a TabularFilesScan LogicalPlan for a given glob filepath."""
# Glob the path using the Runner
Expand All @@ -40,6 +41,7 @@ def get_tabular_files_scan(
storage_config,
infer_schema=infer_schema,
schema=_get_schema_from_dict(schema)._schema if schema is not None else None,
file_path_column=file_path_column,
)

builder = LogicalPlanBuilder.from_tabular_scan(
Expand Down
3 changes: 2 additions & 1 deletion daft/logical/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from daft.daft import read_csv_schema as _read_csv_schema
from daft.daft import read_json_schema as _read_json_schema
from daft.daft import read_parquet_schema as _read_parquet_schema
from daft.datatype import DataType, TimeUnit
from daft.datatype import DataType, TimeUnit, _ensure_registered_super_ext_type

if TYPE_CHECKING:
import pyarrow as pa
Expand Down Expand Up @@ -82,6 +82,7 @@ def to_pyarrow_schema(self) -> pa.Schema:
Returns:
pa.Schema: PyArrow schema that corresponds to the provided Daft schema
"""
_ensure_registered_super_ext_type()
return self._schema.to_pyarrow_schema()

@classmethod
Expand Down
2 changes: 2 additions & 0 deletions daft/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ def to_arrow(self) -> pa.Array:
"""
Convert this Series to an pyarrow array.
"""
_ensure_registered_super_ext_type()

dtype = self.datatype()
arrow_arr = self._series.to_arrow()

Expand Down
1 change: 1 addition & 0 deletions docs/source/api_docs/expressions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ List
Expression.list.slice
Expression.list.chunk
Expression.list.sort
Expression.list.value_counts

Struct
######
Expand Down
1 change: 1 addition & 0 deletions src/daft-csv/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ fn tables_concat(mut tables: Vec<Table>) -> DaftResult<Table> {
)
}

#[allow(clippy::too_many_arguments)]
async fn read_csv_single_into_table(
uri: &str,
convert_options: Option<CsvConvertOptions>,
Expand Down
3 changes: 3 additions & 0 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ impl MicroPartition {
field_id_mapping.clone(),
parquet_metadata,
chunk_size,
scan_task.file_path_column.as_deref(),
)
.context(DaftCoreComputeSnafu)
}
Expand Down Expand Up @@ -1121,6 +1122,7 @@ pub fn read_parquet_into_micropartition<T: AsRef<str>>(
field_id_mapping: Option<Arc<BTreeMap<i32, Field>>>,
parquet_metadata: Option<Vec<Arc<FileMetaData>>>,
chunk_size: Option<usize>,
file_path_column: Option<&str>,
) -> DaftResult<MicroPartition> {
if let Some(so) = start_offset
&& so > 0
Expand Down Expand Up @@ -1308,6 +1310,7 @@ pub fn read_parquet_into_micropartition<T: AsRef<str>>(
}),
num_rows,
),
file_path_column.map(|s| s.to_string()),
);

let fill_map = scan_task.partition_spec().map(|pspec| pspec.to_fill_map());
Expand Down
1 change: 1 addition & 0 deletions src/daft-micropartition/src/ops/cast_to_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ impl MicroPartition {
schema,
scan_task.storage_config.clone(),
scan_task.pushdowns.clone(),
scan_task.file_path_column.clone(),
))
};
Ok(Self::new_unloaded(
Expand Down
2 changes: 2 additions & 0 deletions src/daft-micropartition/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ impl PyMicroPartition {
None,
None,
None,
None,
)
})?;
Ok(mp.into())
Expand Down Expand Up @@ -666,6 +667,7 @@ impl PyMicroPartition {
None,
None,
chunk_size,
None,
)
})?;
Ok(mp.into())
Expand Down
3 changes: 1 addition & 2 deletions src/daft-parquet/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,10 +736,9 @@ impl ParquetFileReader {
})?
.into_iter()
.collect::<DaftResult<Vec<_>>>()?;
let daft_schema = daft_core::prelude::Schema::try_from(self.arrow_schema.as_ref())?;

Table::new_with_size(
daft_schema,
Schema::new(all_series.iter().map(|s| s.field().clone()).collect())?,
all_series,
self.row_ranges.as_ref().iter().map(|rr| rr.num_rows).sum(),
)
Expand Down
8 changes: 6 additions & 2 deletions src/daft-parquet/src/stream_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,12 @@ pub fn arrow_column_iters_to_table_iter(
return Err(super::Error::ParquetColumnsDontHaveEqualRows { path: uri.clone() }.into());
}

let mut table = Table::new_with_size(owned_schema_ref.clone(), all_series, len)
.with_context(|_| super::UnableToCreateTableFromChunkSnafu { path: uri.clone() })?;
let mut table = Table::new_with_size(
Schema::new(all_series.iter().map(|s| s.field().clone()).collect())?,
all_series,
len,
)
.with_context(|_| super::UnableToCreateTableFromChunkSnafu { path: uri.clone() })?;

// Apply delete rows if needed
if let Some(delete_rows) = &delete_rows
Expand Down
65 changes: 49 additions & 16 deletions src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use daft_scan::{
PhysicalScanInfo, Pushdowns, ScanOperatorRef,
};
use daft_schema::{
dtype::DataType,
field::Field,
schema::{Schema, SchemaRef},
};
Expand Down Expand Up @@ -146,7 +147,7 @@ impl LogicalPlanBuilder {
io_config: Option<IOConfig>,
multithreaded_io: bool,
) -> DaftResult<Self> {
use daft_scan::storage_config::PyStorageConfig;
use daft_scan::storage_config::{NativeStorageConfig, PyStorageConfig, StorageConfig};

Python::with_gil(|py| {
let io_config = io_config.unwrap_or_default();
Expand Down Expand Up @@ -194,21 +195,45 @@ impl LogicalPlanBuilder {
pushdowns.clone().unwrap_or_default(),
));
// If column selection (projection) pushdown is specified, prune unselected columns from the schema.
let output_schema = if let Some(Pushdowns {
columns: Some(columns),
..
}) = &pushdowns
&& columns.len() < schema.fields.len()
{
let pruned_upstream_schema = schema
.fields
.iter()
.filter(|&(name, _)| columns.contains(name))
.map(|(_, field)| field.clone())
.collect::<Vec<_>>();
Arc::new(Schema::new(pruned_upstream_schema)?)
} else {
schema
// If file path column is specified, add it to the schema.
let output_schema = match (&pushdowns, &scan_operator.0.file_path_column()) {
(
Some(Pushdowns {
columns: Some(columns),
..
}),
file_path_column_opt,
) if columns.len() < schema.fields.len() => {
let pruned_fields = schema
.fields
.iter()
.filter(|(name, _)| columns.contains(name))
.map(|(_, field)| field.clone());

let finalized_fields = match file_path_column_opt {
Some(file_path_column) => pruned_fields
.chain(std::iter::once(Field::new(
(*file_path_column).to_string(),
DataType::Utf8,
)))
.collect::<Vec<_>>(),
None => pruned_fields.collect::<Vec<_>>(),
};
Arc::new(Schema::new(finalized_fields)?)
}
(None, Some(file_path_column)) => {
let schema_with_file_path = schema
.fields
.values()
.cloned()
.chain(std::iter::once(Field::new(
(*file_path_column).to_string(),
DataType::Utf8,
)))
.collect::<Vec<_>>();
Arc::new(Schema::new(schema_with_file_path)?)
}
_ => schema,
};
let logical_plan: LogicalPlan =
logical_ops::Source::new(output_schema, source_info.into()).into();
Expand Down Expand Up @@ -585,6 +610,7 @@ pub struct ParquetScanBuilder {
pub io_config: Option<IOConfig>,
pub multithreaded: bool,
pub schema: Option<SchemaRef>,
pub file_path_column: Option<String>,
}

impl ParquetScanBuilder {
Expand All @@ -605,6 +631,7 @@ impl ParquetScanBuilder {
multithreaded: true,
schema: None,
io_config: None,
file_path_column: None,
}
}
pub fn infer_schema(mut self, infer_schema: bool) -> Self {
Expand Down Expand Up @@ -642,6 +669,11 @@ impl ParquetScanBuilder {
self
}

pub fn file_path_column(mut self, file_path_column: String) -> Self {
self.file_path_column = Some(file_path_column);
self
}

pub fn finish(self) -> DaftResult<LogicalPlanBuilder> {
let cfg = ParquetSourceConfig {
coerce_int96_timestamp_unit: self.coerce_int96_timestamp_unit,
Expand All @@ -658,6 +690,7 @@ impl ParquetScanBuilder {
))),
self.infer_schema,
self.schema,
self.file_path_column,
)?);

LogicalPlanBuilder::table_scan(ScanOperatorRef(operator), None)
Expand Down
5 changes: 5 additions & 0 deletions src/daft-scan/src/anonymous.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ impl ScanOperator for AnonymousScanOperator {
&[]
}

fn file_path_column(&self) -> Option<&str> {
None
}

fn can_absorb_filter(&self) -> bool {
false
}
Expand Down Expand Up @@ -101,6 +105,7 @@ impl ScanOperator for AnonymousScanOperator {
schema.clone(),
storage_config.clone(),
pushdowns.clone(),
None,
)
.into())
},
Expand Down
Loading

0 comments on commit 252a330

Please sign in to comment.