From f5b8dfa1906304fe07ed088471989f31c4d0ac63 Mon Sep 17 00:00:00 2001 From: Jay Chia <17691182+jaychia@users.noreply.github.com> Date: Tue, 21 May 2024 19:46:18 -0700 Subject: [PATCH] [BUG] Fix multi-output tasks in RayRunner (#2291) Fixes a user-reported bug around `into_partitions` failing on the Ray runner for a specific corner-case: when **some** input partition wasn't split, the RayRunner was performing an overly-aggressive assert that should be relaxed. --------- Co-authored-by: Jay Chia --- daft/runners/ray_runner.py | 5 ++++- tests/dataframe/test_repartition.py | 12 ++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index a73fdd0781..60aa80992d 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -560,7 +560,9 @@ def place_in_queue(item): # If it is a no-op task, just run it locally immediately. elif len(next_step.instructions) == 0: logger.debug("Running task synchronously in main thread: %s", next_step) - assert isinstance(next_step, SingleOutputPartitionTask) + assert ( + len(next_step.partial_metadatas) == 1 + ), "No-op tasks must have one output by definition, since there are no instructions to run" [single_partial] = next_step.partial_metadatas if single_partial.num_rows is None: [single_meta] = ray.get(get_metas.remote(next_step.inputs)) @@ -577,6 +579,7 @@ def place_in_queue(item): ) ] ) + next_step.set_result( [RayMaterializedResult(partition, accessor, 0) for partition in next_step.inputs] ) diff --git a/tests/dataframe/test_repartition.py b/tests/dataframe/test_repartition.py index ff864a73f4..3c559131cc 100644 --- a/tests/dataframe/test_repartition.py +++ b/tests/dataframe/test_repartition.py @@ -11,3 +11,15 @@ def test_into_partitions_coalesce(make_df) -> None: data = {"foo": list(range(100))} df = make_df(data).into_partitions(20).into_partitions(1).collect() assert df.to_pydict() == data + + +def test_into_partitions_some_no_split(make_df) -> None: + data = {"foo": [1, 2, 3]} + + # Materialize as 3 partitions + df = make_df(data).into_partitions(3).collect() + + # Attempt to split into 4 partitions, so only 1 split occurs + df = df.into_partitions(4).collect() + + assert df.to_pydict() == data