From 20940b98d716e4092b98b829daa95d2a886fbd15 Mon Sep 17 00:00:00 2001 From: Serina Grill <42048900+serinamarie@users.noreply.github.com> Date: Mon, 1 Jul 2024 14:01:34 -0500 Subject: [PATCH 1/7] initial commit --- src/prefect/engine.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/prefect/engine.py b/src/prefect/engine.py index de526b616cc4..c45d123413b3 100644 --- a/src/prefect/engine.py +++ b/src/prefect/engine.py @@ -1604,6 +1604,12 @@ async def create_task_run_future( ) ) + # Remove any previous task run futures with the same name + flow_run_context.task_run_futures = [ + future for future in flow_run_context.task_run_futures + if not future.name.startswith(task.name) + ] + # Track the task run future in the flow run context flow_run_context.task_run_futures.append(future) From 7b940d2c7dcf635a0dae3dd55a16afc2c4cf095c Mon Sep 17 00:00:00 2001 From: Serina Grill <42048900+serinamarie@users.noreply.github.com> Date: Mon, 1 Jul 2024 14:10:03 -0500 Subject: [PATCH 2/7] lint --- src/prefect/engine.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/prefect/engine.py b/src/prefect/engine.py index c45d123413b3..612cd8e62e6d 100644 --- a/src/prefect/engine.py +++ b/src/prefect/engine.py @@ -1606,7 +1606,8 @@ async def create_task_run_future( # Remove any previous task run futures with the same name flow_run_context.task_run_futures = [ - future for future in flow_run_context.task_run_futures + future + for future in flow_run_context.task_run_futures if not future.name.startswith(task.name) ] From a917e8dde23a74ad366c3751252f9ffb36113365 Mon Sep 17 00:00:00 2001 From: Serina Grill <42048900+serinamarie@users.noreply.github.com> Date: Mon, 1 Jul 2024 14:13:46 -0500 Subject: [PATCH 3/7] lint --- src/prefect/engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/prefect/engine.py b/src/prefect/engine.py index 612cd8e62e6d..b7e61eb1ded0 100644 --- a/src/prefect/engine.py +++ b/src/prefect/engine.py @@ -1606,7 +1606,7 @@ async def create_task_run_future( # Remove any previous task run futures with the same name flow_run_context.task_run_futures = [ - future + future for future in flow_run_context.task_run_futures if not future.name.startswith(task.name) ] From c06115762307f9fb5e810c8b505f82fc01e827c3 Mon Sep 17 00:00:00 2001 From: Serina Grill <42048900+serinamarie@users.noreply.github.com> Date: Mon, 1 Jul 2024 15:17:07 -0500 Subject: [PATCH 4/7] update implementation --- src/prefect/engine.py | 18 +++++++++--------- src/prefect/tasks.py | 4 ++-- src/prefect/utilities/engine.py | 1 + 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/prefect/engine.py b/src/prefect/engine.py index b7e61eb1ded0..21e5d5b5267a 100644 --- a/src/prefect/engine.py +++ b/src/prefect/engine.py @@ -1375,6 +1375,7 @@ def enter_task_run_engine( return_type: EngineReturnType, task_runner: Optional[BaseTaskRunner], mapped: bool, + entering_from_task_run: Optional[bool] = False, ) -> Union[PrefectFuture, Awaitable[PrefectFuture], TaskRun]: """Sync entrypoint for task calls""" @@ -1410,6 +1411,7 @@ def enter_task_run_engine( wait_for=wait_for, return_type=return_type, task_runner=task_runner, + entering_from_task_run=entering_from_task_run, ) if task.isasync and ( @@ -1536,6 +1538,7 @@ async def get_task_call_return_value( return_type: EngineReturnType, task_runner: Optional[BaseTaskRunner], extra_task_inputs: Optional[Dict[str, Set[TaskRunInput]]] = None, + entering_from_task_run: Optional[bool] = False, ): extra_task_inputs = extra_task_inputs or {} @@ -1546,6 +1549,7 @@ async def get_task_call_return_value( wait_for=wait_for, task_runner=task_runner, extra_task_inputs=extra_task_inputs, + entering_from_task_run=entering_from_task_run, ) if return_type == "future": return future @@ -1564,12 +1568,14 @@ async def create_task_run_future( wait_for: Optional[Iterable[PrefectFuture]], task_runner: Optional[BaseTaskRunner], extra_task_inputs: Dict[str, Set[TaskRunInput]], + entering_from_task_run: Optional[bool] = False, ) -> PrefectFuture: # Default to the flow run's task runner task_runner = task_runner or flow_run_context.task_runner # Generate a name for the future dynamic_key = _dynamic_key_for_task_run(flow_run_context, task) + task_run_name = ( f"{task.name}-{dynamic_key}" if flow_run_context and flow_run_context.flow_run @@ -1604,15 +1610,9 @@ async def create_task_run_future( ) ) - # Remove any previous task run futures with the same name - flow_run_context.task_run_futures = [ - future - for future in flow_run_context.task_run_futures - if not future.name.startswith(task.name) - ] - - # Track the task run future in the flow run context - flow_run_context.task_run_futures.append(future) + if not entering_from_task_run: + # Track the task run future in the flow run context + flow_run_context.task_run_futures.append(future) if task_runner.concurrency_type == TaskConcurrencyType.SEQUENTIAL: await future._wait() diff --git a/src/prefect/tasks.py b/src/prefect/tasks.py index bba5517e03f3..f5a31cd7bb42 100644 --- a/src/prefect/tasks.py +++ b/src/prefect/tasks.py @@ -685,6 +685,7 @@ def __call__( return_type=return_type, client=get_client(), ) + entering_from_task_run = bool(TaskRunContext.get()) return enter_task_run_engine( self, @@ -693,6 +694,7 @@ def __call__( task_runner=SequentialTaskRunner(), return_type=return_type, mapped=False, + entering_from_task_run=entering_from_task_run, ) @overload @@ -735,7 +737,6 @@ def _run( # Convert the call args/kwargs to a parameter dict parameters = get_call_parameters(self.fn, args, kwargs) - return enter_task_run_engine( self, parameters=parameters, @@ -1174,7 +1175,6 @@ def map( return from_async.wait_for_call_in_loop_thread(map_call) else: return from_sync.wait_for_call_in_loop_thread(map_call) - return enter_task_run_engine( self, parameters=parameters, diff --git a/src/prefect/utilities/engine.py b/src/prefect/utilities/engine.py index 1fdd794dbb1a..77a801fa6ab9 100644 --- a/src/prefect/utilities/engine.py +++ b/src/prefect/utilities/engine.py @@ -144,6 +144,7 @@ async def wait_for_task_runs_and_report_crashes( result = await client.set_task_run_state( task_run_id=future.task_run.id, state=state, force=True ) + if result.status == SetStateStatus.ACCEPT: engine_logger.debug( f"Reported crashed task run {future.name!r} successfully." From 5a46fbd1ec0df9307641d35ae0128699959f5557 Mon Sep 17 00:00:00 2001 From: Serina Grill <42048900+serinamarie@users.noreply.github.com> Date: Mon, 1 Jul 2024 15:19:27 -0500 Subject: [PATCH 5/7] linting --- src/prefect/tasks.py | 2 ++ src/prefect/utilities/engine.py | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/prefect/tasks.py b/src/prefect/tasks.py index f5a31cd7bb42..29069cd7499f 100644 --- a/src/prefect/tasks.py +++ b/src/prefect/tasks.py @@ -737,6 +737,7 @@ def _run( # Convert the call args/kwargs to a parameter dict parameters = get_call_parameters(self.fn, args, kwargs) + return enter_task_run_engine( self, parameters=parameters, @@ -1175,6 +1176,7 @@ def map( return from_async.wait_for_call_in_loop_thread(map_call) else: return from_sync.wait_for_call_in_loop_thread(map_call) + return enter_task_run_engine( self, parameters=parameters, diff --git a/src/prefect/utilities/engine.py b/src/prefect/utilities/engine.py index 77a801fa6ab9..1fdd794dbb1a 100644 --- a/src/prefect/utilities/engine.py +++ b/src/prefect/utilities/engine.py @@ -144,7 +144,6 @@ async def wait_for_task_runs_and_report_crashes( result = await client.set_task_run_state( task_run_id=future.task_run.id, state=state, force=True ) - if result.status == SetStateStatus.ACCEPT: engine_logger.debug( f"Reported crashed task run {future.name!r} successfully." From ce8cfa8290175b056ad097738aa3a57f1c2d4b48 Mon Sep 17 00:00:00 2001 From: Serina Grill <42048900+serinamarie@users.noreply.github.com> Date: Mon, 1 Jul 2024 15:32:19 -0500 Subject: [PATCH 6/7] dont passed to mapped task --- src/prefect/engine.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/prefect/engine.py b/src/prefect/engine.py index 21e5d5b5267a..f5647fb28f24 100644 --- a/src/prefect/engine.py +++ b/src/prefect/engine.py @@ -1403,15 +1403,20 @@ def enter_task_run_engine( if flow_run_context.timeout_scope and flow_run_context.timeout_scope.cancel_called: raise TimeoutError("Flow run timed out") + call_arguments = { + "task": task, + "flow_run_context": flow_run_context, + "parameters": parameters, + "wait_for": wait_for, + "return_type": return_type, + "task_runner": task_runner, + } + + if not mapped: + call_arguments["entering_from_task_run"] = entering_from_task_run + begin_run = create_call( - begin_task_map if mapped else get_task_call_return_value, - task=task, - flow_run_context=flow_run_context, - parameters=parameters, - wait_for=wait_for, - return_type=return_type, - task_runner=task_runner, - entering_from_task_run=entering_from_task_run, + begin_task_map if mapped else get_task_call_return_value, **call_arguments ) if task.isasync and ( From 077618451677a69c661a4f7566379a198b187919 Mon Sep 17 00:00:00 2001 From: Serina Grill <42048900+serinamarie@users.noreply.github.com> Date: Mon, 1 Jul 2024 16:49:45 -0500 Subject: [PATCH 7/7] add a test --- tests/test_tasks.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/tests/test_tasks.py b/tests/test_tasks.py index b344cecd7d38..81d408f140f5 100644 --- a/tests/test_tasks.py +++ b/tests/test_tasks.py @@ -4226,6 +4226,35 @@ def my_flow(): assert result == "Failed" assert count == 2 + def test_nested_task_with_retries_on_outer_task(self): + """ + Regression test for https://github.com/PrefectHQ/prefect/issues/14390 + where the flow run would be marked as failed despite the tasks eventually succeeding. + """ + + failed = False + + @task + def nested_flaky_task(): + # This task will fail the first time it is run, but will succeed if called a second time + nonlocal failed + if not failed: + failed = True + raise ValueError("Forced task failure") + + @task( + retries=1, + ) + def top_task(): + nested_flaky_task() + + @flow + def nested_task_flow(): + top_task() + + result = nested_task_flow() + assert result[0].is_completed() + def test_nested_task_with_retries_on_inner_and_outer_task(self): count = 0