Skip to content

Commit

Permalink
Implement hive partitioned reads
Browse files Browse the repository at this point in the history
  • Loading branch information
desmondcheongzx committed Oct 12, 2024
1 parent c694c9e commit b84a598
Show file tree
Hide file tree
Showing 16 changed files with 276 additions and 113 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 15 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,32 +112,33 @@ tikv-jemallocator = {version = "0.5.4", features = [
[workspace]
members = [
"src/arrow2",
"src/parquet2",
"src/common/daft-config",
"src/common/display",
"src/common/error",
"src/common/io-config",
"src/common/treenode",
"src/common/daft-config",
"src/common/system-info",
"src/common/treenode",
"src/daft-core",
"src/daft-local-execution",
"src/daft-io",
"src/daft-image",
"src/daft-parquet",
"src/daft-csv",
"src/daft-json",
"src/daft-dsl",
"src/daft-table",
"src/daft-plan",
"src/daft-physical-plan",
"src/daft-functions",
"src/daft-functions-json",
"src/daft-hive",
"src/daft-image",
"src/daft-io",
"src/daft-json",
"src/daft-local-execution",
"src/daft-micropartition",
"src/daft-parquet",
"src/daft-physical-plan",
"src/daft-plan",
"src/daft-scan",
"src/daft-scheduler",
"src/daft-sketch",
"src/daft-functions",
"src/daft-functions-json",
"src/daft-sql",
"src/hyperloglog"
"src/daft-table",
"src/hyperloglog",
"src/parquet2"
]

[workspace.dependencies]
Expand Down
1 change: 1 addition & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,7 @@ class ScanOperatorHandle:
glob_path: list[str],
file_format_config: FileFormatConfig,
storage_config: StorageConfig,
hive_partitioning: bool,
infer_schema: bool,
schema: PySchema | None = None,
file_path_column: str | None = None,
Expand Down
3 changes: 3 additions & 0 deletions daft/io/_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def read_csv(
allow_variable_columns: bool = False,
io_config: Optional["IOConfig"] = None,
file_path_column: Optional[str] = None,
hive_partitioning: bool = False,
use_native_downloader: bool = True,
schema_hints: Optional[Dict[str, DataType]] = None,
_buffer_size: Optional[int] = None,
Expand All @@ -56,6 +57,7 @@ def read_csv(
allow_variable_columns (bool): Whether to allow for variable number of columns in the CSV, defaults to False. If set to True, Daft will append nulls to rows with less columns than the schema, and ignore extra columns in rows with more columns
io_config (IOConfig): Config to be used with the native downloader
file_path_column: Include the source path(s) as a column with this name. Defaults to None.
hive_partitioning: Whether to use hive-style partitioning when reading glob files. Defaults to False.
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This
is currently experimental.
Expand Down Expand Up @@ -100,5 +102,6 @@ def read_csv(
file_format_config=file_format_config,
storage_config=storage_config,
file_path_column=file_path_column,
hive_partitioning=hive_partitioning,
)
return DataFrame(builder)
3 changes: 3 additions & 0 deletions daft/io/_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def read_json(
schema: Optional[Dict[str, DataType]] = None,
io_config: Optional["IOConfig"] = None,
file_path_column: Optional[str] = None,
hive_partitioning: bool = False,
use_native_downloader: bool = True,
schema_hints: Optional[Dict[str, DataType]] = None,
_buffer_size: Optional[int] = None,
Expand All @@ -43,6 +44,7 @@ def read_json(
schema (dict[str, DataType]): A schema that is used as the definitive schema for the JSON if infer_schema is False, otherwise it is used as a schema hint that is applied after the schema is inferred.
io_config (IOConfig): Config to be used with the native downloader
file_path_column: Include the source path(s) as a column with this name. Defaults to None.
hive_partitioning: Whether to use hive-style partitioning when reading glob files. Defaults to False.
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This
is currently experimental.
Expand Down Expand Up @@ -77,5 +79,6 @@ def read_json(
file_format_config=file_format_config,
storage_config=storage_config,
file_path_column=file_path_column,
hive_partitioning=hive_partitioning,
)
return DataFrame(builder)
3 changes: 3 additions & 0 deletions daft/io/_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def read_parquet(
schema: Optional[Dict[str, DataType]] = None,
io_config: Optional["IOConfig"] = None,
file_path_column: Optional[str] = None,
hive_partitioning: bool = False,
use_native_downloader: bool = True,
coerce_int96_timestamp_unit: Optional[Union[str, TimeUnit]] = None,
schema_hints: Optional[Dict[str, DataType]] = None,
Expand All @@ -47,6 +48,7 @@ def read_parquet(
schema (dict[str, DataType]): A schema that is used as the definitive schema for the Parquet file if infer_schema is False, otherwise it is used as a schema hint that is applied after the schema is inferred.
io_config (IOConfig): Config to be used with the native downloader
file_path_column: Include the source path(s) as a column with this name. Defaults to None.
hive_partitioning: Whether to use hive-style partitioning when reading glob files. Defaults to False.
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet.
coerce_int96_timestamp_unit: TimeUnit to coerce Int96 TimeStamps to. e.g.: [ns, us, ms], Defaults to None.
_multithreaded_io: Whether to use multithreading for IO threads. Setting this to False can be helpful in reducing
Expand Down Expand Up @@ -96,5 +98,6 @@ def read_parquet(
file_format_config=file_format_config,
storage_config=storage_config,
file_path_column=file_path_column,
hive_partitioning=hive_partitioning,
)
return DataFrame(builder)
2 changes: 2 additions & 0 deletions daft/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def get_tabular_files_scan(
file_format_config: FileFormatConfig,
storage_config: StorageConfig,
file_path_column: str | None = None,
hive_partitioning: bool = False,
) -> LogicalPlanBuilder:
"""Returns a TabularFilesScan LogicalPlan for a given glob filepath."""
# Glob the path using the Runner
Expand All @@ -42,6 +43,7 @@ def get_tabular_files_scan(
infer_schema=infer_schema,
schema=_get_schema_from_dict(schema)._schema if schema is not None else None,
file_path_column=file_path_column,
hive_partitioning=hive_partitioning,
)

builder = LogicalPlanBuilder.from_tabular_scan(
Expand Down
16 changes: 16 additions & 0 deletions src/daft-hive/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[dependencies]
arrow2 = {workspace = true, features = ["io_parquet", "io_parquet_compression"]}
common-error = {path = "../common/error", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
daft-decoding = {path = "../daft-decoding", default-features = false}
daft-schema = {path = "../daft-schema", default-features = false}
daft-stats = {path = "../daft-stats", default-features = false}
daft-table = {path = "../daft-table", default-features = false}

[lints]
workspace = true

[package]
edition = {workspace = true}
name = "daft-hive"
version = {workspace = true}
89 changes: 89 additions & 0 deletions src/daft-hive/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use std::collections::HashMap;

use common_error::DaftResult;
use daft_core::{datatypes::Utf8Array, series::IntoSeries};
use daft_decoding::inference::infer;
use daft_schema::{dtype::DaftDataType, field::Field, schema::Schema};
use daft_table::Table;

/// Parses hive-style /key=value/ components from a uri.
pub fn parse_hive_partitioning(uri: &str) -> HashMap<&str, &str> {
let mut equality_pos = 0;
let mut partition_start = 0;
let mut valid_partition = true;
let mut partitions = HashMap::new();
// Loops through the uri looking for valid partitions. Although we consume partitions only when
// encountering a slash separator, we never need to check the last "partition" at the end of the
// uri because this needs to be a file name.
for (idx, c) in uri.char_indices() {
match c {
// A '?' char indicates the start of GET parameters, so stop parsing hive partitions.
// We also ban '\n' for hive partitions, given all the edge cases that can arise.
'?' | '\n' => break,
'\\' | '/' => {
if valid_partition && equality_pos > partition_start {
let key = &uri[partition_start..equality_pos];
let value = &uri[equality_pos + 1..idx];
partitions.insert(key, value);
}
partition_start = idx + 1;
valid_partition = true;
}
'=' => {
// If we see more than one '=' in the partition, it is not a valid partition.
if equality_pos > partition_start {
valid_partition = false;
}
equality_pos = idx;
}
_ => (),
}
}
partitions
}

/// Takes hive partition key-value pairs as `partitions`, and the schema of the containing table as
/// `table_schema`, and returns a 1-dimensional table containing the partition keys as columns, and
/// their partition values as the singular row of values.
pub fn hive_partitions_to_1d_table(
partitions: &HashMap<&str, &str>,
table_schema: &Schema,
) -> DaftResult<Table> {
let uncasted_fields = partitions
.keys()
.map(|&key| Field::new(key, daft_schema::dtype::DataType::Utf8))
.collect();
let uncasted_schema = Schema::new(uncasted_fields)?;
let uncasted_series = partitions
.iter()
.map(|(&key, &value)| {
let arrow_array = arrow2::array::Utf8Array::from_iter_values(std::iter::once(&value));
let daft_array = Utf8Array::from((key, Box::new(arrow_array)));
daft_array.into_series()
})
.collect::<Vec<_>>();
let uncast_table = Table::new_unchecked(uncasted_schema, uncasted_series, /*num_rows=*/ 1);

let partition_fields = table_schema
.fields
.clone()
.into_iter()
.map(|(_, field)| field)
.filter(|field| partitions.contains_key(&field.name.as_str()))
.collect();
let partition_schema = Schema::new(partition_fields)?;
// TODO(desmond): There's probably a better way to do this.
let casted_table = uncast_table.cast_to_schema(&partition_schema)?;
Ok(casted_table)
}

/// Turns hive partition key-value pairs into a schema with the partitions' keys as field names, and
/// inferring field types from the partitions' values. We don't do schema type inference here as the
/// user is expected to provide the schema for hive-partitioned fields.
pub fn hive_partitions_to_schema(partitions: &HashMap<&str, &str>) -> DaftResult<Schema> {
let partition_fields: Vec<Field> = partitions
.iter()
.map(|(&key, &value)| Field::new(key, DaftDataType::from(&infer(value.as_bytes()))))
.collect();
Schema::new(partition_fields)
}
2 changes: 2 additions & 0 deletions src/daft-parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ common-error = {path = "../common/error", default-features = false}
crossbeam-channel = "0.5.1"
daft-core = {path = "../daft-core", default-features = false}
daft-dsl = {path = "../daft-dsl", default-features = false}
daft-hive = {path = "../daft-hive", default-features = false}
daft-io = {path = "../daft-io", default-features = false}
daft-schema = {path = "../daft-schema", default-features = false}
daft-stats = {path = "../daft-stats", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
futures = {workspace = true}
Expand Down
Loading

0 comments on commit b84a598

Please sign in to comment.