Skip to content

Commit

Permalink
fix custom task name for background tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz committed Oct 23, 2024
1 parent 08bf7d2 commit 6b1d291
Showing 1 changed file with 75 additions and 37 deletions.
112 changes: 75 additions & 37 deletions src/prefect/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# This file requires type-checking with pyright because mypy does not yet support PEP612
# See https://github.com/python/mypy/issues/8645

import asyncio
import datetime
import inspect
from copy import copy
Expand Down Expand Up @@ -309,7 +310,9 @@ def __init__(
Callable[["TaskRunContext", Dict[str, Any]], Optional[str]]
] = None,
cache_expiration: Optional[datetime.timedelta] = None,
task_run_name: Optional[Union[Callable[[], str], str]] = None,
task_run_name: Optional[
Union[Callable[[], str], Callable[[Dict[str, Any]], str], str]
] = None,
retries: Optional[int] = None,
retry_delay_seconds: Optional[
Union[
Expand Down Expand Up @@ -370,7 +373,7 @@ def __init__(

# the task is considered async if its function is async or an async
# generator
self.isasync = inspect.iscoroutinefunction(
self.isasync = asyncio.iscoroutinefunction(
self.fn
) or inspect.isasyncgenfunction(self.fn)

Expand Down Expand Up @@ -530,7 +533,9 @@ def with_options(
cache_key_fn: Optional[
Callable[["TaskRunContext", Dict[str, Any]], Optional[str]]
] = None,
task_run_name: Optional[Union[Callable[[], str], str, Type[NotSet]]] = NotSet,
task_run_name: Optional[
Union[Callable[[], str], Callable[[Dict[str, Any]], str], str, Type[NotSet]]
] = NotSet,
cache_expiration: Optional[datetime.timedelta] = None,
retries: Union[int, Type[NotSet]] = NotSet,
retry_delay_seconds: Union[
Expand Down Expand Up @@ -744,13 +749,12 @@ async def create_run(

async with client:
task_run_name = custom_task_run_name or self.name
if not flow_run_context:
dynamic_key = f"{self.task_key}-{str(uuid4().hex)}"
else:
dynamic_key = _dynamic_key_for_task_run(
context=flow_run_context, task=self
)
task_run_name += f"-{str(dynamic_key)[:3]}"
dynamic_key = (
_dynamic_key_for_task_run(context=flow_run_context, task=self)
if flow_run_context
else str(uuid4().hex)
)
task_run_name += f"-{str(dynamic_key)[:3]}"

if deferred:
state = Scheduled()
Expand Down Expand Up @@ -1060,6 +1064,8 @@ def submit(
task runner. This call only blocks execution while the task is being submitted,
once it is submitted, the flow function will continue executing.
This method is always synchronous, even if the underlying user function is asynchronous.
Args:
*args: Arguments to run the task with
return_state: Return the result of the flow run wrapped in a
Expand Down Expand Up @@ -1112,7 +1118,7 @@ def submit(
>>>
>>> @flow
>>> async def my_flow():
>>> await my_async_task.submit()
>>> my_async_task.submit()
Run a sync task in an async flow
Expand Down Expand Up @@ -1170,51 +1176,73 @@ def submit(

@overload
def map(
self: "Task[P, NoReturn]",
*args: P.args,
**kwargs: P.kwargs,
) -> PrefectFutureList[PrefectFuture[NoReturn]]:
self: "Task[P, R]",
*args: Any,
return_state: Literal[True],
wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = ...,
deferred: bool = ...,
**kwargs: Any,
) -> List[State[R]]:
...

@overload
def map(
self: "Task[P, Coroutine[Any, Any, T]]",
*args: P.args,
**kwargs: P.kwargs,
) -> PrefectFutureList[PrefectFuture[T]]:
self: "Task[P, R]",
*args: Any,
wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = ...,
deferred: bool = ...,
**kwargs: Any,
) -> PrefectFutureList[R]:
...

@overload
def map(
self: "Task[P, T]",
*args: P.args,
**kwargs: P.kwargs,
) -> PrefectFutureList[PrefectFuture[T]]:
self: "Task[P, R]",
*args: Any,
return_state: Literal[True],
wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = ...,
deferred: bool = ...,
**kwargs: Any,
) -> List[State[R]]:
...

@overload
def map(
self: "Task[P, Coroutine[Any, Any, T]]",
*args: P.args,
return_state: Literal[True],
**kwargs: P.kwargs,
) -> PrefectFutureList[State[T]]:
self: "Task[P, R]",
*args: Any,
wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = ...,
deferred: bool = ...,
**kwargs: Any,
) -> PrefectFutureList[R]:
...

@overload
def map(
self: "Task[P, T]",
*args: P.args,
self: "Task[P, Coroutine[Any, Any, R]]",
*args: Any,
return_state: Literal[True],
**kwargs: P.kwargs,
) -> PrefectFutureList[State[T]]:
wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = ...,
deferred: bool = ...,
**kwargs: Any,
) -> List[State[R]]:
...

@overload
def map(
self: "Task[P, Coroutine[Any, Any, R]]",
*args: Any,
return_state: Literal[False],
wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = ...,
deferred: bool = ...,
**kwargs: Any,
) -> PrefectFutureList[R]:
...

def map(
self,
*args: Any,
return_state: bool = False,
wait_for: Optional[Iterable[PrefectFuture]] = None,
wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = None,
deferred: bool = False,
**kwargs: Any,
):
Expand All @@ -1235,6 +1263,8 @@ def map(
also blocks while the tasks are being submitted, once they are
submitted, the flow function will continue executing.
This method is always synchronous, even if the underlying user function is asynchronous.
Args:
*args: Iterable and static arguments to run the tasks with
return_state: Return a list of Prefect States that wrap the results
Expand Down Expand Up @@ -1459,9 +1489,13 @@ def apply_async(
deferred=True,
wait_for=wait_for,
extra_task_inputs=dependencies,
custom_task_run_name=_resolve_custom_task_run_name(self, parameters), # type: ignore
custom_task_run_name=(
_resolve_custom_task_run_name(self, parameters)
if self.task_run_name is not None
else None
),
)
)
) # type: ignore

from prefect.utilities.engine import emit_task_run_state_change_event

Expand Down Expand Up @@ -1559,7 +1593,9 @@ def task(
Callable[["TaskRunContext", Dict[str, Any]], Optional[str]]
] = None,
cache_expiration: Optional[datetime.timedelta] = None,
task_run_name: Optional[Union[Callable[[], str], str]] = None,
task_run_name: Optional[
Union[Callable[[], str], Callable[[Dict[str, Any]], str], str]
] = None,
retries: int = 0,
retry_delay_seconds: Union[
float,
Expand Down Expand Up @@ -1596,7 +1632,9 @@ def task(
Callable[["TaskRunContext", Dict[str, Any]], Optional[str]], None
] = None,
cache_expiration: Optional[datetime.timedelta] = None,
task_run_name: Optional[Union[Callable[[], str], str]] = None,
task_run_name: Optional[
Union[Callable[[], str], Callable[[Dict[str, Any]], str], str]
] = None,
retries: Optional[int] = None,
retry_delay_seconds: Union[
float, int, List[float], Callable[[int], List[float]], None
Expand Down

0 comments on commit 6b1d291

Please sign in to comment.