Skip to content

Commit

Permalink
[CHORE] [Repartitioning] Refactor + hide PartitionSpec and rename t…
Browse files Browse the repository at this point in the history
…o `ClusteringSpec`. (#1961)

This PR refactors `PartitionSpec` to be a top-level parametrized enum,
renames it to `ClusteringSpec` to disambiguate it from the scan-level
`Partitionspec`, and no longer exposes this abstraction to the Python
side of Daft for `df.repartition()` calls. Instead, we expose separate
`LogicalPlanBuilder` methods for hash-based repartitioning, random
shuffling, and simple repartitioning (coalesce/split), and keep
`ClusteringSpec` as a physical plan concept for how data is clustered
rather than doubling it as a repartitioning specification.

Closes #1963
  • Loading branch information
clarkzinzow authored Mar 7, 2024
1 parent 262aea1 commit 0179fd8
Show file tree
Hide file tree
Showing 20 changed files with 550 additions and 838 deletions.
58 changes: 3 additions & 55 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -138,58 +138,6 @@ class CountMode(Enum):
"""
...

class PartitionScheme(Enum):
"""
Partition scheme for Daft DataFrame.
"""

Range: int
Hash: int
Random: int
Unknown: int

class PartitionSpec:
"""
Partition specification for a Daft DataFrame.
"""

scheme: PartitionScheme
num_partitions: int
by: list[PyExpr]
scheme_config: RangeConfig | HashConfig | RandomConfig | UnknownConfig

@staticmethod
def range(by: list[PyExpr], num_partitions: int, descending: list[bool]) -> PartitionSpec: ...
@staticmethod
def hash(by: list[PyExpr], num_partitions: int) -> PartitionSpec: ...
@staticmethod
def random(num_partitions: int) -> PartitionSpec: ...
@staticmethod
def unknown(num_partitions: int) -> PartitionSpec: ...
def __eq__(self, other: PartitionSpec) -> bool: ... # type: ignore[override]
def __ne__(self, other: PartitionSpec) -> bool: ... # type: ignore[override]
def __str__(self) -> str: ...

class RangeConfig:
"""Configuration of a range partitioning."""

descending: list[bool]

class HashConfig:
"""Configuration of a hash partitioning."""

...

class RandomConfig:
"""Configuration of a random partitioning."""

...

class UnknownConfig:
"""Configuration of an unknown partitioning."""

...

class ResourceRequest:
"""
Resource request for a query fragment task.
Expand Down Expand Up @@ -1185,7 +1133,6 @@ class PhysicalPlanScheduler:
"""

def num_partitions(self) -> int: ...
def partition_spec(self) -> PartitionSpec: ...
def repr_ascii(self, simple: bool) -> str: ...
def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.InProgressPhysicalPlan: ...

Expand All @@ -1209,12 +1156,13 @@ class LogicalPlanBuilder:
def limit(self, limit: int, eager: bool) -> LogicalPlanBuilder: ...
def explode(self, to_explode: list[PyExpr]) -> LogicalPlanBuilder: ...
def sort(self, sort_by: list[PyExpr], descending: list[bool]) -> LogicalPlanBuilder: ...
def repartition(
def hash_repartition(
self,
partition_by: list[PyExpr],
scheme: PartitionScheme,
num_partitions: int | None,
) -> LogicalPlanBuilder: ...
def random_shuffle(self, num_partitions: int | None) -> LogicalPlanBuilder: ...
def into_partitions(self, num_partitions: int) -> LogicalPlanBuilder: ...
def coalesce(self, num_partitions: int) -> LogicalPlanBuilder: ...
def distinct(self) -> LogicalPlanBuilder: ...
def sample(self, fraction: float, with_replacement: bool, seed: int | None) -> LogicalPlanBuilder: ...
Expand Down
28 changes: 7 additions & 21 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,7 @@
from daft.api_annotations import DataframePublicAPI
from daft.context import get_context
from daft.convert import InputListType
from daft.daft import (
FileFormat,
IOConfig,
JoinStrategy,
JoinType,
PartitionScheme,
ResourceRequest,
)
from daft.daft import FileFormat, IOConfig, JoinStrategy, JoinType, ResourceRequest
from daft.dataframe.preview import DataFramePreview
from daft.datatype import DataType
from daft.errors import ExpressionTypeError
Expand Down Expand Up @@ -730,16 +723,13 @@ def repartition(self, num: Optional[int], *partition_by: ColumnInputType) -> "Da
"""
if len(partition_by) == 0:
warnings.warn(
"No columns specified for repartition; If you do not require rebalancing of partitions, you may "
"instead prefer using `df.into_partitions(N)` which is a cheaper operation that avoids shuffling data."
"No columns specified for repartition, so doing a random shuffle. If you do not require rebalancing of "
"partitions, you may instead prefer using `df.into_partitions(N)` which is a cheaper operation that "
"avoids shuffling data."
)
scheme = PartitionScheme.Random
exprs = []
builder = self._builder.random_shuffle(num)
else:
scheme = PartitionScheme.Hash
exprs = self.__column_input_to_expression(partition_by)

builder = self._builder.repartition(num_partitions=num, partition_by=exprs, scheme=scheme)
builder = self._builder.hash_repartition(num, self.__column_input_to_expression(partition_by))
return DataFrame(builder)

@DataframePublicAPI
Expand All @@ -758,11 +748,7 @@ def into_partitions(self, num: int) -> "DataFrame":
Returns:
DataFrame: Dataframe with ``num`` partitions.
"""
builder = self._builder.repartition(
num_partitions=num,
partition_by=[],
scheme=PartitionScheme.Unknown,
)
builder = self._builder.into_partitions(num)
return DataFrame(builder)

@DataframePublicAPI
Expand Down
21 changes: 11 additions & 10 deletions daft/logical/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,7 @@

from daft.daft import CountMode, FileFormat, IOConfig, JoinStrategy, JoinType
from daft.daft import LogicalPlanBuilder as _LogicalPlanBuilder
from daft.daft import (
PartitionScheme,
PyDaftExecutionConfig,
ResourceRequest,
ScanOperatorHandle,
)
from daft.daft import PyDaftExecutionConfig, ResourceRequest, ScanOperatorHandle
from daft.expressions import Expression, col
from daft.logical.schema import Schema
from daft.runners.partitioning import PartitionCacheEntry
Expand Down Expand Up @@ -126,11 +121,17 @@ def sort(self, sort_by: list[Expression], descending: list[bool] | bool = False)
builder = self._builder.sort(sort_by_pyexprs, descending)
return LogicalPlanBuilder(builder)

def repartition(
self, num_partitions: int | None, partition_by: list[Expression], scheme: PartitionScheme
) -> LogicalPlanBuilder:
def hash_repartition(self, num_partitions: int | None, partition_by: list[Expression]) -> LogicalPlanBuilder:
partition_by_pyexprs = [expr._expr for expr in partition_by]
builder = self._builder.repartition(partition_by_pyexprs, scheme, num_partitions=num_partitions)
builder = self._builder.hash_repartition(partition_by_pyexprs, num_partitions=num_partitions)
return LogicalPlanBuilder(builder)

def random_shuffle(self, num_partitions: int | None) -> LogicalPlanBuilder:
builder = self._builder.random_shuffle(num_partitions)
return LogicalPlanBuilder(builder)

def into_partitions(self, num_partitions: int) -> LogicalPlanBuilder:
builder = self._builder.into_partitions(num_partitions)
return LogicalPlanBuilder(builder)

def agg(
Expand Down
4 changes: 0 additions & 4 deletions daft/plan_scheduler/physical_plan_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

from daft.daft import PartitionSpec
from daft.daft import PhysicalPlanScheduler as _PhysicalPlanScheduler
from daft.execution import physical_plan
from daft.runners.partitioning import PartitionT
Expand All @@ -17,9 +16,6 @@ def __init__(self, scheduler: _PhysicalPlanScheduler):
def num_partitions(self) -> int:
return self._scheduler.num_partitions()

def partition_spec(self) -> PartitionSpec:
return self._scheduler.partition_spec()

def pretty_print(self, simple: bool = False) -> str:
"""
Pretty prints the current underlying physical plan.
Expand Down
75 changes: 34 additions & 41 deletions src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ use crate::{
logical_ops,
logical_plan::LogicalPlan,
optimization::Optimizer,
partitioning::PartitionSchemeConfig,
partitioning::{
HashRepartitionConfig, IntoPartitionsConfig, RandomShuffleConfig, RepartitionSpec,
},
planner::plan,
sink_info::{OutputFileInfo, SinkInfo},
source_info::SourceInfo,
JoinStrategy, JoinType, PartitionScheme, PhysicalPlanScheduler, ResourceRequest,
JoinStrategy, JoinType, PhysicalPlanScheduler, ResourceRequest,
};
use common_error::{DaftError, DaftResult};
use common_io_config::IOConfig;
Expand Down Expand Up @@ -129,17 +131,32 @@ impl LogicalPlanBuilder {
Ok(logical_plan.into())
}

pub fn repartition(
pub fn hash_repartition(
&self,
num_partitions: Option<usize>,
partition_by: Vec<Expr>,
scheme_config: PartitionSchemeConfig,
) -> DaftResult<Self> {
let logical_plan: LogicalPlan = logical_ops::Repartition::try_new(
self.plan.clone(),
num_partitions,
partition_by,
scheme_config,
RepartitionSpec::Hash(HashRepartitionConfig::new(num_partitions, partition_by)),
)?
.into();
Ok(logical_plan.into())
}

pub fn random_shuffle(&self, num_partitions: Option<usize>) -> DaftResult<Self> {
let logical_plan: LogicalPlan = logical_ops::Repartition::try_new(
self.plan.clone(),
RepartitionSpec::Random(RandomShuffleConfig::new(num_partitions)),
)?
.into();
Ok(logical_plan.into())
}

pub fn into_partitions(&self, num_partitions: usize) -> DaftResult<Self> {
let logical_plan: LogicalPlan = logical_ops::Repartition::try_new(
self.plan.clone(),
RepartitionSpec::IntoPartitions(IntoPartitionsConfig::new(num_partitions)),
)?
.into();
Ok(logical_plan.into())
Expand Down Expand Up @@ -333,53 +350,29 @@ impl PyLogicalPlanBuilder {
Ok(self.builder.sort(sort_by_exprs, descending)?.into())
}

pub fn repartition(
pub fn hash_repartition(
&self,
py: Python<'_>,
partition_by: Vec<PyExpr>,
scheme: PartitionScheme,
num_partitions: Option<usize>,
scheme_config: Option<PyObject>,
) -> PyResult<Self> {
let partition_by_exprs: Vec<Expr> = partition_by
.iter()
.map(|expr| expr.clone().into())
.collect();
let partition_scheme_config = match scheme {
PartitionScheme::Range => {
if let Some(scheme_config) = scheme_config {
PartitionSchemeConfig::Range(scheme_config.extract(py)?)
} else {
return Err(DaftError::ValueError(
"Must provide a scheme config with ascending/descending list if repartitioning by range.".to_string(),
).into());
}
}
PartitionScheme::Hash => PartitionSchemeConfig::Hash(
scheme_config
.map(|c| c.extract(py))
.transpose()?
.unwrap_or_default(),
),
PartitionScheme::Random => PartitionSchemeConfig::Random(
scheme_config
.map(|c| c.extract(py))
.transpose()?
.unwrap_or_default(),
),
PartitionScheme::Unknown => PartitionSchemeConfig::Unknown(
scheme_config
.map(|c| c.extract(py))
.transpose()?
.unwrap_or_default(),
),
};
Ok(self
.builder
.repartition(num_partitions, partition_by_exprs, partition_scheme_config)?
.hash_repartition(num_partitions, partition_by_exprs)?
.into())
}

pub fn random_shuffle(&self, num_partitions: Option<usize>) -> PyResult<Self> {
Ok(self.builder.random_shuffle(num_partitions)?.into())
}

pub fn into_partitions(&self, num_partitions: usize) -> PyResult<Self> {
Ok(self.builder.into_partitions(num_partitions)?.into())
}

pub fn distinct(&self) -> PyResult<Self> {
Ok(self.builder.distinct()?.into())
}
Expand Down
22 changes: 9 additions & 13 deletions src/daft-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,23 @@ mod test;
mod treenode;

pub use builder::{LogicalPlanBuilder, PyLogicalPlanBuilder};
use daft_scan::{
file_format::{
CsvSourceConfig, FileFormat, JsonSourceConfig, ParquetSourceConfig, PyFileFormatConfig,
},
storage_config::{NativeStorageConfig, PyStorageConfig},
};
use daft_scan::file_format::FileFormat;
pub use join::{JoinStrategy, JoinType};
pub use logical_plan::LogicalPlan;
pub use partitioning::{PartitionScheme, PartitionSchemeConfig, PartitionSpec};
pub use partitioning::ClusteringSpec;
pub use physical_plan::PhysicalPlanScheduler;
pub use resource_request::ResourceRequest;
pub use source_info::{FileInfo, FileInfos};

#[cfg(feature = "python")]
use daft_scan::storage_config::PythonStorageConfig;
#[cfg(feature = "python")]
use partitioning::PyPartitionSpec;
#[cfg(feature = "python")]
use pyo3::prelude::*;
#[cfg(feature = "python")]
use {
daft_scan::file_format::{
CsvSourceConfig, JsonSourceConfig, ParquetSourceConfig, PyFileFormatConfig,
},
daft_scan::storage_config::{NativeStorageConfig, PyStorageConfig, PythonStorageConfig},
};

#[cfg(feature = "python")]
pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> {
Expand All @@ -48,8 +46,6 @@ pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> {
parent.add_class::<ParquetSourceConfig>()?;
parent.add_class::<JsonSourceConfig>()?;
parent.add_class::<CsvSourceConfig>()?;
parent.add_class::<PyPartitionSpec>()?;
parent.add_class::<PartitionScheme>()?;
parent.add_class::<JoinType>()?;
parent.add_class::<JoinStrategy>()?;
parent.add_class::<PhysicalPlanScheduler>()?;
Expand Down
Loading

0 comments on commit 0179fd8

Please sign in to comment.