From 9b8c2b286c9c0cd746bdf48838f456814fd99d95 Mon Sep 17 00:00:00 2001 From: Jay Chia <17691182+jaychia@users.noreply.github.com> Date: Wed, 14 Aug 2024 18:17:32 -0700 Subject: [PATCH] [FEAT] Deprecates usage of resource_request on df.with_column API (#2654) Fully deprecates the use of `df.with_column(..., resource_request=...)` in favor of using resources on UDFs instead. --------- Co-authored-by: Jay Chia --- daft/daft.pyi | 2 +- daft/dataframe/dataframe.py | 27 ++-- daft/logical/builder.py | 7 +- src/daft-dsl/src/functions/python/mod.rs | 60 +------ src/daft-plan/src/builder.rs | 33 +--- tests/test_resource_requests.py | 190 +++++++---------------- 6 files changed, 78 insertions(+), 241 deletions(-) diff --git a/daft/daft.pyi b/daft/daft.pyi index 17de5febc6..2cab06ab5f 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -1619,7 +1619,7 @@ class LogicalPlanBuilder: @staticmethod def table_scan(scan_operator: ScanOperatorHandle) -> LogicalPlanBuilder: ... def select(self, to_select: list[PyExpr]) -> LogicalPlanBuilder: ... - def with_columns(self, columns: list[PyExpr], resource_request: ResourceRequest | None) -> LogicalPlanBuilder: ... + def with_columns(self, columns: list[PyExpr]) -> LogicalPlanBuilder: ... def exclude(self, to_exclude: list[str]) -> LogicalPlanBuilder: ... def filter(self, predicate: PyExpr) -> LogicalPlanBuilder: ... def limit(self, limit: int, eager: bool) -> LogicalPlanBuilder: ... diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index dcf431d13c..44e04cddd1 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -31,7 +31,7 @@ from daft.api_annotations import DataframePublicAPI from daft.context import get_context from daft.convert import InputListType -from daft.daft import FileFormat, IOConfig, JoinStrategy, JoinType, ResourceRequest, resolve_expr +from daft.daft import FileFormat, IOConfig, JoinStrategy, JoinType, resolve_expr from daft.dataframe.preview import DataFramePreview from daft.datatype import DataType from daft.errors import ExpressionTypeError @@ -51,6 +51,7 @@ import ray import torch + from daft.daft import ResourceRequest from daft.io import DataCatalogTable from daft.logical.schema import Schema @@ -1218,7 +1219,7 @@ def with_column( self, column_name: str, expr: Expression, - resource_request: Optional[ResourceRequest] = None, + resource_request: Optional["ResourceRequest"] = None, ) -> "DataFrame": """Adds a column to the current DataFrame with an Expression, equivalent to a ``select`` with all current columns and the new one @@ -1245,26 +1246,26 @@ def with_column( Args: column_name (str): name of new column expr (Expression): expression of the new column. - resource_request (ResourceRequest): a custom resource request for the execution of this operation (NOTE: this will be deprecated - in Daft version 0.3.0. Please use resource requests on your UDFs instead.) Returns: DataFrame: DataFrame with new column. """ if resource_request is not None: - warnings.warn( - "Specifying resource_request through `with_column` will be deprecated from Daft version >= 0.3.0! " + raise ValueError( + "Specifying resource_request through `with_column` is deprecated from Daft version >= 0.3.0! " "Instead, please use the APIs on UDFs directly for controlling the resource requests of your UDFs. " + "You can define resource requests directly on the `@udf(num_gpus=N, num_cpus=M, ...)` decorator. " + "Alternatively, you can override resource requests on UDFs like so: `my_udf.override_options(num_gpus=N)`. " "Check the Daft documentation for more details." ) - return self.with_columns({column_name: expr}, resource_request) + return self.with_columns({column_name: expr}) @DataframePublicAPI def with_columns( self, columns: Dict[str, Expression], - resource_request: Optional[ResourceRequest] = None, + resource_request: Optional["ResourceRequest"] = None, ) -> "DataFrame": """Adds columns to the current DataFrame with Expressions, equivalent to a ``select`` with all current columns and the new ones @@ -1290,22 +1291,22 @@ def with_columns( Args: columns (Dict[str, Expression]): Dictionary of new columns in the format { name: expression } - resource_request (ResourceRequest): a custom resource request for the execution of this operation (NOTE: this will be deprecated - in Daft version 0.3.0. Please use resource requests on your UDFs instead.) Returns: DataFrame: DataFrame with new columns. """ if resource_request is not None: - warnings.warn( - "Specifying resource_request through `with_columns` will be deprecated from Daft version >= 0.3.0! " + raise ValueError( + "Specifying resource_request through `with_columns` is deprecated from Daft version >= 0.3.0! " "Instead, please use the APIs on UDFs directly for controlling the resource requests of your UDFs. " + "You can define resource requests directly on the `@udf(num_gpus=N, num_cpus=M, ...)` decorator. " + "Alternatively, you can override resource requests on UDFs like so: `my_udf.override_options(num_gpus=N)`. " "Check the Daft documentation for more details." ) new_columns = [col.alias(name) for name, col in columns.items()] - builder = self._builder.with_columns(new_columns, resource_request) + builder = self._builder.with_columns(new_columns) return DataFrame(builder) @DataframePublicAPI diff --git a/daft/logical/builder.py b/daft/logical/builder.py index 383a452e55..db40e0a461 100644 --- a/daft/logical/builder.py +++ b/daft/logical/builder.py @@ -10,7 +10,6 @@ JoinStrategy, JoinType, PyDaftExecutionConfig, - ResourceRequest, ScanOperatorHandle, ) from daft.daft import LogicalPlanBuilder as _LogicalPlanBuilder @@ -123,11 +122,9 @@ def select( builder = self._builder.select(to_select_pyexprs) return LogicalPlanBuilder(builder) - def with_columns( - self, columns: list[Expression], custom_resource_request: ResourceRequest | None - ) -> LogicalPlanBuilder: + def with_columns(self, columns: list[Expression]) -> LogicalPlanBuilder: column_pyexprs = [expr._expr for expr in columns] - builder = self._builder.with_columns(column_pyexprs, custom_resource_request) + builder = self._builder.with_columns(column_pyexprs) return LogicalPlanBuilder(builder) def exclude(self, to_exclude: list[str]) -> LogicalPlanBuilder: diff --git a/src/daft-dsl/src/functions/python/mod.rs b/src/daft-dsl/src/functions/python/mod.rs index f3b88c4ab1..a677a126d2 100644 --- a/src/daft-dsl/src/functions/python/mod.rs +++ b/src/daft-dsl/src/functions/python/mod.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use common_error::DaftResult; use common_resource_request::ResourceRequest; -use common_treenode::{Transformed, TreeNode, TreeNodeRecursion}; +use common_treenode::{TreeNode, TreeNodeRecursion}; use daft_core::datatypes::DataType; use itertools::Itertools; use serde::{Deserialize, Serialize}; @@ -141,64 +141,6 @@ pub fn stateful_udf( }) } -/// Replaces resource_requests on UDF expressions in the provided expression tree -pub fn replace_udf_resource_request( - expr: ExprRef, - override_resource_request: &ResourceRequest, -) -> ExprRef { - expr.transform(|e| match e.as_ref() { - Expr::Function { - func: - FunctionExpr::Python(PythonUDF::Stateful( - original @ StatefulPythonUDF { - resource_request, .. - }, - )), - inputs, - } => { - if let Some(existing_rr) = resource_request - && existing_rr == override_resource_request - { - return Ok(Transformed::no(e)); - } - let new_expr = Expr::Function { - func: FunctionExpr::Python(PythonUDF::Stateful(StatefulPythonUDF { - resource_request: Some(override_resource_request.clone()), - ..original.clone() - })), - inputs: inputs.clone(), - }; - Ok(Transformed::yes(new_expr.arced())) - } - Expr::Function { - func: - FunctionExpr::Python(PythonUDF::Stateless( - original @ StatelessPythonUDF { - resource_request, .. - }, - )), - inputs, - } => { - if let Some(existing_rr) = resource_request - && existing_rr == override_resource_request - { - return Ok(Transformed::no(e)); - } - let new_expr = Expr::Function { - func: FunctionExpr::Python(PythonUDF::Stateless(StatelessPythonUDF { - resource_request: Some(override_resource_request.clone()), - ..original.clone() - })), - inputs: inputs.clone(), - }; - Ok(Transformed::yes(new_expr.arced())) - } - _ => Ok(Transformed::no(e)), - }) - .unwrap() - .data -} - /// Generates a ResourceRequest by inspecting an iterator of expressions. /// Looks for ResourceRequests on UDFs in each expression presented, and merges ResourceRequests across all expressions. pub fn get_resource_request(exprs: &[ExprRef]) -> Option { diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index 74c1a7e8c5..341bdc1b40 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -17,12 +17,11 @@ use crate::{ use common_display::DisplayFormat; use common_error::DaftResult; use common_io_config::IOConfig; -use common_resource_request::ResourceRequest; use daft_core::{ join::{JoinStrategy, JoinType}, schema::{Schema, SchemaRef}, }; -use daft_dsl::{col, functions::python::replace_udf_resource_request, ExprRef}; +use daft_dsl::{col, ExprRef}; use daft_scan::{file_format::FileFormat, PhysicalScanInfo, Pushdowns, ScanOperatorRef}; #[cfg(feature = "python")] @@ -137,24 +136,7 @@ impl LogicalPlanBuilder { Ok(logical_plan.into()) } - pub fn with_columns( - &self, - columns: Vec, - resource_request: Option, - ) -> DaftResult { - // TODO: This should be deprecated in Daft >= v0.3 - // - // Here we use resource_request to parametrize any UDFs in the new expression columns - // In the future, the ability to pass ResourceRequests into with_column(s) will be deprecated. Users will parametrize their UDFs directly instead. - let columns = if let Some(rr) = resource_request { - columns - .into_iter() - .map(|expr| replace_udf_resource_request(expr, &rr)) - .collect() - } else { - columns - }; - + pub fn with_columns(&self, columns: Vec) -> DaftResult { let fields = &self.schema().fields; let current_col_names = fields .iter() @@ -561,15 +543,8 @@ impl PyLogicalPlanBuilder { Ok(self.builder.select(pyexprs_to_exprs(to_select))?.into()) } - pub fn with_columns( - &self, - columns: Vec, - resource_request: Option, - ) -> PyResult { - Ok(self - .builder - .with_columns(pyexprs_to_exprs(columns), resource_request)? - .into()) + pub fn with_columns(&self, columns: Vec) -> PyResult { + Ok(self.builder.with_columns(pyexprs_to_exprs(columns))?.into()) } pub fn exclude(&self, to_exclude: Vec) -> PyResult { diff --git a/tests/test_resource_requests.py b/tests/test_resource_requests.py index 79eea94c06..13ccf0cb74 100644 --- a/tests/test_resource_requests.py +++ b/tests/test_resource_requests.py @@ -6,7 +6,7 @@ import ray import daft -from daft import ResourceRequest, udf +from daft import udf from daft.context import get_context from daft.daft import SystemInfo from daft.expressions import col @@ -61,63 +61,41 @@ def test_partial_resource_request_overrides(): @pytest.mark.skipif(get_context().runner_config.name not in {"py"}, reason="requires PyRunner to be in use") -@pytest.mark.parametrize("use_legacy_resource_requesting", [True, False]) -def test_requesting_too_many_cpus(use_legacy_resource_requesting): +def test_requesting_too_many_cpus(): df = daft.from_pydict(DATA) system_info = SystemInfo() - if use_legacy_resource_requesting: - df = df.with_column( - "foo", - my_udf(col("id")), - resource_request=ResourceRequest(num_cpus=system_info.cpu_count() + 1), - ) - else: - my_udf_parametrized = my_udf.override_options(num_cpus=system_info.cpu_count() + 1) - df = df.with_column( - "foo", - my_udf_parametrized(col("id")), - ) + my_udf_parametrized = my_udf.override_options(num_cpus=system_info.cpu_count() + 1) + df = df.with_column( + "foo", + my_udf_parametrized(col("id")), + ) with pytest.raises(RuntimeError): df.collect() @pytest.mark.skipif(get_context().runner_config.name not in {"py"}, reason="requires PyRunner to be in use") -@pytest.mark.parametrize("use_legacy_resource_requesting", [True, False]) -def test_requesting_too_many_gpus(use_legacy_resource_requesting): +def test_requesting_too_many_gpus(): df = daft.from_pydict(DATA) - if use_legacy_resource_requesting: - df = df.with_column( - "foo", my_udf(col("id")), resource_request=ResourceRequest(num_gpus=cuda_device_count() + 1) - ) - else: - my_udf_parametrized = my_udf.override_options(num_gpus=cuda_device_count() + 1) - df = df.with_column("foo", my_udf_parametrized(col("id"))) + my_udf_parametrized = my_udf.override_options(num_gpus=cuda_device_count() + 1) + df = df.with_column("foo", my_udf_parametrized(col("id"))) with pytest.raises(RuntimeError): df.collect() @pytest.mark.skipif(get_context().runner_config.name not in {"py"}, reason="requires PyRunner to be in use") -@pytest.mark.parametrize("use_legacy_resource_requesting", [True, False]) -def test_requesting_too_much_memory(use_legacy_resource_requesting): +def test_requesting_too_much_memory(): df = daft.from_pydict(DATA) system_info = SystemInfo() - if use_legacy_resource_requesting: - df = df.with_column( - "foo", - my_udf(col("id")), - resource_request=ResourceRequest(memory_bytes=system_info.total_memory() + 1), - ) - else: - my_udf_parametrized = my_udf.override_options(memory_bytes=system_info.total_memory() + 1) - df = df.with_column( - "foo", - my_udf_parametrized(col("id")), - ) + my_udf_parametrized = my_udf.override_options(memory_bytes=system_info.total_memory() + 1) + df = df.with_column( + "foo", + my_udf_parametrized(col("id")), + ) with pytest.raises(RuntimeError): df.collect() @@ -149,24 +127,14 @@ def assert_resources(c, num_cpus=None, num_gpus=None, memory=None): RAY_VERSION_LT_2, reason="The ray.get_runtime_context().get_assigned_resources() was only added in Ray >= 2.0" ) @pytest.mark.skipif(get_context().runner_config.name not in {"ray"}, reason="requires RayRunner to be in use") -@pytest.mark.parametrize("use_legacy_resource_requesting", [True, False]) -def test_with_column_rayrunner(use_legacy_resource_requesting): +def test_with_column_rayrunner(): df = daft.from_pydict(DATA).repartition(2) - if use_legacy_resource_requesting: - df = df.with_column( - "resources_ok", - assert_resources(col("id"), num_cpus=1, num_gpus=None, memory=1_000_000), - resource_request=ResourceRequest(num_cpus=1, memory_bytes=1_000_000, num_gpus=None), - ) - else: - assert_resources_parametrized = assert_resources.override_options( - num_cpus=1, memory_bytes=1_000_000, num_gpus=None - ) - df = df.with_column( - "resources_ok", - assert_resources_parametrized(col("id"), num_cpus=1, num_gpus=None, memory=1_000_000), - ) + assert_resources_parametrized = assert_resources.override_options(num_cpus=1, memory_bytes=1_000_000, num_gpus=None) + df = df.with_column( + "resources_ok", + assert_resources_parametrized(col("id"), num_cpus=1, num_gpus=None, memory=1_000_000), + ) df.collect() @@ -175,8 +143,7 @@ def test_with_column_rayrunner(use_legacy_resource_requesting): RAY_VERSION_LT_2, reason="The ray.get_runtime_context().get_assigned_resources() was only added in Ray >= 2.0" ) @pytest.mark.skipif(get_context().runner_config.name not in {"ray"}, reason="requires RayRunner to be in use") -@pytest.mark.parametrize("use_legacy_resource_requesting", [True, False]) -def test_with_column_folded_rayrunner(use_legacy_resource_requesting): +def test_with_column_folded_rayrunner(): df = daft.from_pydict(DATA).repartition(2) # Because of Projection Folding optimizations, the expected resource request is the max of the three .with_column requests @@ -186,28 +153,16 @@ def test_with_column_folded_rayrunner(use_legacy_resource_requesting): assert_resources(col("id"), **expected), ) - if use_legacy_resource_requesting: - df = df.with_column( - "more_memory_request", - assert_resources(col("id"), **expected), - resource_request=ResourceRequest(num_cpus=1, memory_bytes=5_000_000, num_gpus=None), - ) - df = df.with_column( - "more_cpu_request", - assert_resources(col("id"), **expected), - resource_request=ResourceRequest(num_cpus=1, memory_bytes=None, num_gpus=None), - ) - else: - assert_resources_1 = assert_resources.override_options(num_cpus=1, memory_bytes=5_000_000, num_gpus=None) - assert_resources_2 = assert_resources.override_options(num_cpus=1, memory_bytes=None, num_gpus=None) - df = df.with_column( - "more_memory_request", - assert_resources_1(col("id"), **expected), - ) - df = df.with_column( - "more_cpu_request", - assert_resources_2(col("id"), **expected), - ) + assert_resources_1 = assert_resources.override_options(num_cpus=1, memory_bytes=5_000_000, num_gpus=None) + assert_resources_2 = assert_resources.override_options(num_cpus=1, memory_bytes=None, num_gpus=None) + df = df.with_column( + "more_memory_request", + assert_resources_1(col("id"), **expected), + ) + df = df.with_column( + "more_cpu_request", + assert_resources_2(col("id"), **expected), + ) df.collect() @@ -233,24 +188,14 @@ def assert_num_cuda_visible_devices(c, num_gpus: int = 0): @pytest.mark.skipif(get_context().runner_config.name not in {"py"}, reason="requires PyRunner to be in use") @pytest.mark.skipif(no_gpu_available(), reason="requires GPUs to be available") -@pytest.mark.parametrize("use_legacy_resource_requesting", [True, False]) -def test_with_column_pyrunner_gpu(use_legacy_resource_requesting): +def test_with_column_pyrunner_gpu(): df = daft.from_pydict(DATA).repartition(5) - # We do not do any masking of devices for the local PyRunner, even if the user requests fewer GPUs - # than the host actually has. - if use_legacy_resource_requesting: - df = df.with_column( - "foo", - assert_num_cuda_visible_devices(col("id"), num_gpus=cuda_device_count()), - resource_request=ResourceRequest(num_gpus=1), - ) - else: - # We set num_gpus=1 on the UDF itself - df = df.with_column( - "foo", - assert_num_cuda_visible_devices(col("id"), num_gpus=cuda_device_count()), - ) + # We set num_gpus=1 on the UDF itself + df = df.with_column( + "foo", + assert_num_cuda_visible_devices(col("id"), num_gpus=cuda_device_count()), + ) df.collect() @@ -258,56 +203,33 @@ def test_with_column_pyrunner_gpu(use_legacy_resource_requesting): @pytest.mark.skipif(get_context().runner_config.name not in {"ray"}, reason="requires RayRunner to be in use") @pytest.mark.skipif(no_gpu_available(), reason="requires GPUs to be available") @pytest.mark.parametrize("num_gpus", [None, 1]) -@pytest.mark.parametrize("use_legacy_resource_requesting", [True, False]) -def test_with_column_rayrunner_gpu(num_gpus, use_legacy_resource_requesting): +def test_with_column_rayrunner_gpu(num_gpus): df = daft.from_pydict(DATA).repartition(2) - if use_legacy_resource_requesting: - df = df.with_column( - "num_cuda_visible_devices", - assert_num_cuda_visible_devices(col("id"), num_gpus=num_gpus if num_gpus is not None else 0), - resource_request=ResourceRequest(num_gpus=num_gpus), - ) - else: - assert_num_cuda_visible_devices_parametrized = assert_num_cuda_visible_devices.override_options( - num_gpus=num_gpus - ) - df = df.with_column( - "num_cuda_visible_devices", - assert_num_cuda_visible_devices_parametrized(col("id"), num_gpus=num_gpus if num_gpus is not None else 0), - ) + assert_num_cuda_visible_devices_parametrized = assert_num_cuda_visible_devices.override_options(num_gpus=num_gpus) + df = df.with_column( + "num_cuda_visible_devices", + assert_num_cuda_visible_devices_parametrized(col("id"), num_gpus=num_gpus if num_gpus is not None else 0), + ) df.collect() @pytest.mark.skipif(get_context().runner_config.name not in {"ray"}, reason="requires RayRunner to be in use") @pytest.mark.skipif(no_gpu_available(), reason="requires GPUs to be available") -@pytest.mark.parametrize("use_legacy_resource_requesting", [True, False]) -def test_with_column_max_resources_rayrunner_gpu(use_legacy_resource_requesting): +def test_with_column_max_resources_rayrunner_gpu(): df = daft.from_pydict(DATA).repartition(2) # Because of projection folding optimizations, both UDFs should run with num_gpus=1 even though 0_gpu_col requested for 0 GPUs - if use_legacy_resource_requesting: - df = df.with_column( - "0_gpu_col", - assert_num_cuda_visible_devices(col("id"), num_gpus=1), - resource_request=ResourceRequest(num_gpus=0), - ) - df = df.with_column( - "1_gpu_col", - assert_num_cuda_visible_devices(col("id"), num_gpus=1), - resource_request=ResourceRequest(num_gpus=1), - ) - else: - assert_num_cuda_visible_devices_0 = assert_num_cuda_visible_devices.override_options(num_gpus=0) - assert_num_cuda_visible_devices_1 = assert_num_cuda_visible_devices.override_options(num_gpus=1) - df = df.with_column( - "0_gpu_col", - assert_num_cuda_visible_devices_0(col("id"), num_gpus=1), - ) - df = df.with_column( - "1_gpu_col", - assert_num_cuda_visible_devices_1(col("id"), num_gpus=1), - ) + assert_num_cuda_visible_devices_0 = assert_num_cuda_visible_devices.override_options(num_gpus=0) + assert_num_cuda_visible_devices_1 = assert_num_cuda_visible_devices.override_options(num_gpus=1) + df = df.with_column( + "0_gpu_col", + assert_num_cuda_visible_devices_0(col("id"), num_gpus=1), + ) + df = df.with_column( + "1_gpu_col", + assert_num_cuda_visible_devices_1(col("id"), num_gpus=1), + ) df.collect()