Skip to content

Commit

Permalink
This is a combination of 3 commits.
Browse files Browse the repository at this point in the history
fix custom task name for background tasks

Update src/prefect/tasks.py

revert weird change
  • Loading branch information
zzstoatzz committed Oct 23, 2024
1 parent 8dfcc06 commit cfb6c6e
Showing 1 changed file with 34 additions and 66 deletions.
100 changes: 34 additions & 66 deletions src/prefect/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
# This file requires type-checking with pyright because mypy does not yet support PEP612
# See https://github.com/python/mypy/issues/8645

import asyncio
import datetime
import inspect
from copy import copy
Expand Down Expand Up @@ -310,9 +309,7 @@ def __init__(
Callable[["TaskRunContext", Dict[str, Any]], Optional[str]]
] = None,
cache_expiration: Optional[datetime.timedelta] = None,
task_run_name: Optional[
Union[Callable[[], str], Callable[[Dict[str, Any]], str], str, Type[NotSet]]
] = None,
task_run_name: Optional[Union[Callable[[], str], str]] = None,
retries: Optional[int] = None,
retry_delay_seconds: Optional[
Union[
Expand Down Expand Up @@ -373,7 +370,7 @@ def __init__(

# the task is considered async if its function is async or an async
# generator
self.isasync = asyncio.iscoroutinefunction(
self.isasync = inspect.iscoroutinefunction(
self.fn
) or inspect.isasyncgenfunction(self.fn)

Expand Down Expand Up @@ -533,9 +530,7 @@ def with_options(
cache_key_fn: Optional[
Callable[["TaskRunContext", Dict[str, Any]], Optional[str]]
] = None,
task_run_name: Optional[
Union[Callable[[], str], Callable[[Dict[str, Any]], str], str, Type[NotSet]]
] = None,
task_run_name: Optional[Union[Callable[[], str], str, Type[NotSet]]] = NotSet,
cache_expiration: Optional[datetime.timedelta] = None,
retries: Union[int, Type[NotSet]] = NotSet,
retry_delay_seconds: Union[
Expand Down Expand Up @@ -731,6 +726,7 @@ async def create_run(
wait_for: Optional[Iterable[PrefectFuture]] = None,
extra_task_inputs: Optional[Dict[str, Set[TaskRunInput]]] = None,
deferred: bool = False,
custom_task_run_name: Optional[str] = None,
) -> TaskRun:
from prefect.utilities.engine import (
_dynamic_key_for_task_run,
Expand All @@ -747,14 +743,14 @@ async def create_run(
client = get_client()

async with client:
task_run_name = custom_task_run_name or self.name
if not flow_run_context:
dynamic_key = f"{self.task_key}-{str(uuid4().hex)}"
task_run_name = self.name
else:
dynamic_key = _dynamic_key_for_task_run(
context=flow_run_context, task=self
)
task_run_name = f"{self.name}-{dynamic_key}"
task_run_name += f"-{str(dynamic_key)[:3]}"

if deferred:
state = Scheduled()
Expand Down Expand Up @@ -1064,8 +1060,6 @@ def submit(
task runner. This call only blocks execution while the task is being submitted,
once it is submitted, the flow function will continue executing.
This method is always synchronous, even if the underlying user function is asynchronous.
Args:
*args: Arguments to run the task with
return_state: Return the result of the flow run wrapped in a
Expand Down Expand Up @@ -1118,7 +1112,7 @@ def submit(
>>>
>>> @flow
>>> async def my_flow():
>>> my_async_task.submit()
>>> await my_async_task.submit()
Run a sync task in an async flow
Expand Down Expand Up @@ -1176,73 +1170,51 @@ def submit(

@overload
def map(
self: "Task[P, R]",
*args: Any,
return_state: Literal[True],
wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = ...,
deferred: bool = ...,
**kwargs: Any,
) -> List[State[R]]:
...

@overload
def map(
self: "Task[P, R]",
*args: Any,
wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = ...,
deferred: bool = ...,
**kwargs: Any,
) -> PrefectFutureList[R]:
self: "Task[P, NoReturn]",
*args: P.args,
**kwargs: P.kwargs,
) -> PrefectFutureList[PrefectFuture[NoReturn]]:
...

@overload
def map(
self: "Task[P, R]",
*args: Any,
return_state: Literal[True],
wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = ...,
deferred: bool = ...,
**kwargs: Any,
) -> List[State[R]]:
self: "Task[P, Coroutine[Any, Any, T]]",
*args: P.args,
**kwargs: P.kwargs,
) -> PrefectFutureList[PrefectFuture[T]]:
...

@overload
def map(
self: "Task[P, R]",
*args: Any,
wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = ...,
deferred: bool = ...,
**kwargs: Any,
) -> PrefectFutureList[R]:
self: "Task[P, T]",
*args: P.args,
**kwargs: P.kwargs,
) -> PrefectFutureList[PrefectFuture[T]]:
...

@overload
def map(
self: "Task[P, Coroutine[Any, Any, R]]",
*args: Any,
self: "Task[P, Coroutine[Any, Any, T]]",
*args: P.args,
return_state: Literal[True],
wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = ...,
deferred: bool = ...,
**kwargs: Any,
) -> List[State[R]]:
**kwargs: P.kwargs,
) -> PrefectFutureList[State[T]]:
...

@overload
def map(
self: "Task[P, Coroutine[Any, Any, R]]",
*args: Any,
return_state: Literal[False],
wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = ...,
deferred: bool = ...,
**kwargs: Any,
) -> PrefectFutureList[R]:
self: "Task[P, T]",
*args: P.args,
return_state: Literal[True],
**kwargs: P.kwargs,
) -> PrefectFutureList[State[T]]:
...

def map(
self,
*args: Any,
return_state: bool = False,
wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = None,
wait_for: Optional[Iterable[PrefectFuture]] = None,
deferred: bool = False,
**kwargs: Any,
):
Expand All @@ -1263,8 +1235,6 @@ def map(
also blocks while the tasks are being submitted, once they are
submitted, the flow function will continue executing.
This method is always synchronous, even if the underlying user function is asynchronous.
Args:
*args: Iterable and static arguments to run the tasks with
return_state: Return a list of Prefect States that wrap the results
Expand Down Expand Up @@ -1466,6 +1436,7 @@ def apply_async(
>>> y = task_2.apply_async(wait_for=[x])
"""
from prefect.utilities.engine import _resolve_custom_task_run_name
from prefect.utilities.visualization import (
VisualizationUnsupportedError,
get_task_viz_tracker,
Expand All @@ -1488,8 +1459,9 @@ def apply_async(
deferred=True,
wait_for=wait_for,
extra_task_inputs=dependencies,
custom_task_run_name=_resolve_custom_task_run_name(self, parameters), # type: ignore
)
) # type: ignore
)

from prefect.utilities.engine import emit_task_run_state_change_event

Expand Down Expand Up @@ -1587,9 +1559,7 @@ def task(
Callable[["TaskRunContext", Dict[str, Any]], Optional[str]]
] = None,
cache_expiration: Optional[datetime.timedelta] = None,
task_run_name: Optional[
Union[Callable[[], str], Callable[[Dict[str, Any]], str], str, Type[NotSet]]
] = None,
task_run_name: Optional[Union[Callable[[], str], str]] = None,
retries: int = 0,
retry_delay_seconds: Union[
float,
Expand Down Expand Up @@ -1626,9 +1596,7 @@ def task(
Callable[["TaskRunContext", Dict[str, Any]], Optional[str]], None
] = None,
cache_expiration: Optional[datetime.timedelta] = None,
task_run_name: Optional[
Union[Callable[[], str], Callable[[Dict[str, Any]], str], str, Type[NotSet]]
] = None,
task_run_name: Optional[Union[Callable[[], str], str]] = None,
retries: Optional[int] = None,
retry_delay_seconds: Union[
float, int, List[float], Callable[[int], List[float]], None
Expand Down

0 comments on commit cfb6c6e

Please sign in to comment.