Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Callabe task name generators (3.0 vs. 2.x) #15747

Closed
dmichaelcarter opened this issue Oct 17, 2024 · 6 comments · Fixed by #15773
Closed

Callabe task name generators (3.0 vs. 2.x) #15747

dmichaelcarter opened this issue Oct 17, 2024 · 6 comments · Fixed by #15773
Labels
bug Something isn't working

Comments

@dmichaelcarter
Copy link

dmichaelcarter commented Oct 17, 2024

Bug summary

I am doing some spike work on migrating our flows from v2.19 to v3.0. In doing this, I have found what appears to be a bug with callabe task name generators. I often use task vars (more specifically, values in a task dictionary arg) for naming task threads. This is essential in making sense of complex graphs including dozens, if not hundreds of mapped tasks. I often name the task run based on a value such as a table name during ETL processes; thus it would be more of a hassle to gauge impact of failed tasks when they are named by thread ID instead.

  • It seems that prefect.runtime.task_run may not be properly awaiting task args

The error I am seeing with my callable name generator (which works in Prefect v2.19):
TypeError: 'PrefectConcurrentFuture' object is not subscriptable

A much more generic error is also printed during these flow failures:

Please wait for all submitted tasks to complete before exiting your flow by calling `.wait()` on the `PrefectFuture` returned from your `.submit()` calls.

Example:

from prefect import flow, task

@task
def say_hello(name):
    print(f"Hello, {name}!")

@flow
def example_flow():
    future = say_hello.submit(name="Marvin")
    future.wait()

example_flow()

Here is a simple flow which replicates my problem:

from time import sleep
from random import randint
from prefect import task, flow
from prefect.runtime import task_run


def generate_task_run_name() -> str:
    return f'{task_run.task_name} - input["val"]:  {task_run.parameters["input"]["val"]}'


@task(log_prints=True)
def pre_task() -> bool:
    return True


@task(log_prints=True, task_run_name=generate_task_run_name)
def task_1(input: dict) -> dict:
    input_val = input['val']
    input['val'] = input_val + 1
    print(f'task_1 - input["val"]:  {input_val}, output:  {input}')

    sleep(randint(5,20))
    return input


@task(log_prints=True, task_run_name=generate_task_run_name)
def task_2(input: dict) -> dict:
    input_val = input['val']
    input['val'] = input_val * 10
    print(f'task_2 - input["val"]:  {input_val}, output:  {input}')

    sleep(randint(5,20))
    return input


@task(log_prints=True, task_run_name=generate_task_run_name)
def task_3(input: dict) -> None:
    input_val = input['val']
    print(f'task_3 - input["val"]: {input_val}')

    sleep(randint(5,20))
    return


@flow
def my_flow() -> None:
    pre_task()
    inputs: list = [
        {'val': 1, 'something_else': True}
        ,{'val': 2, 'something_else': True}
        ,{'val': 3, 'something_else': True}
        ,{'val': 4, 'something_else': True}
        ,{'val': 5, 'something_else': True}
        ,{'val': 6, 'something_else': True}
        ,{'val': 7, 'something_else': True}
        ,{'val': 8, 'something_else': True}
    ]

    result_1: dict = task_1.map(input=inputs)
    result_2: dict = task_2.map(input=result_1)
    final_result = task_3.map(input=result_2)

    final_result.wait()


if __name__ == '__main__':
    my_flow()

Version info (prefect version output)

Git commit:          0894bad4
Built:               Thu, Oct 10, 2024 10:17 AM
OS/Arch:             darwin/arm64
Profile:             default
Server type:         cloud
Pydantic version:    2.9.2
Integrations:
  prefect-dask:      0.2.9

Additional context

Here is the flow graph when the task name generators are disabled. The tasks have arbitrary IDs as names. I've attached this screenshot to stress my use case for chaining together mapped tasks. This is done for the sake of concurrency and input streaming. Note that downstream tasks kick off as soon as their inputs become available, resulting in the most efficient timing of individual task "threads"/"branches"
image

@dmichaelcarter dmichaelcarter added the bug Something isn't working label Oct 17, 2024
@zzstoatzz
Copy link
Collaborator

hi @dmichaelcarter - thank you for the issue!

this turned out to be a little tricky due to how tasks are orchestrated in the 3.x engine

in the open PR linked above, I fix the template syntax for doing this

@task(log_prints=True, task_run_name="running with input: {input[val]}")
def task_1(input: dict[str, Any]) -> dict[str, Any]:
    input["val"] += 1
    print(f"task_1 - input:  {input['val']}")

    sleep(uniform(1, 5))
    return input

and propose an alternative to retrieving the parameters from the runtime module

def generate_task_run_name(parameters: dict) -> str:
    return f'{task_run.task_name} - input: {parameters["input"]["number"]}'

I'm going to see if I can preserve access to runtime.task_run.parameters for backwards compatibility, but what do you think about these alternatives?

@dmichaelcarter
Copy link
Author

@zzstoatzz Not sure I totally follow this alternative. How would I pass parameters into generate_task_run_name()?

@zzstoatzz
Copy link
Collaborator

zzstoatzz commented Oct 23, 2024

hi @dmichaelcarter - you wouldn't have to, as the author of generate_task_run_name, you just assume that we pass the rendered parameters dict so you can use them to generate your name as desired

https://github.com/PrefectHQ/prefect/pull/15773/files#r1813200042

example of how you use this

@dmichaelcarter
Copy link
Author

@zzstoatzz okay, so the alternative you are proposing is to use this method?

@dmichaelcarter
Copy link
Author

@zzstoatzz sorry I think i misunderstood you. I can still use my original task_run method, but I just need to add the specific task arg name to the generator function?

@zzstoatzz
Copy link
Collaborator

zzstoatzz commented Oct 23, 2024

I can still use my original task_run method, but I just need to add the specific task arg name to the generator function?

yes! we're just changing where you grab that parameters dict from

you can either do that strategy (a "template string") or the new DX i'll illustrate below - the test I linked illustrates that you can use either one

ill use your original example against main for reference, the only change I need to make is that instead of using prefect.runtime.task_run.parameters I instead assume that prefect will pass me the right parameters if I define my custom function to accept parameters

so all im changing is the following

before

def generate_task_run_name() -> str:
    return f'{task_run.task_name} - input["val"]:  {task_run.parameters["input"]["val"]}'

after

def generate_task_run_name(parameters: dict) -> str:
    return f'{task_run.task_name} - input["val"]:  {parameters["input"]["val"]}'

and prefect will make sure your parameters are resolved and passed to this generate_task_run_name when its time to render the custom name

i.e. prefect will inject parameters if and only if you set task_run_name to a callable that accepts a parameters kwarg

Your original example using the new DX
from time import sleep
from random import randint
from prefect import task, flow
from prefect.runtime import task_run


def generate_task_run_name(parameters: dict) -> str:
    return f'{task_run.task_name} - input["val"]:  {parameters["input"]["val"]}'


@task(log_prints=True)
def pre_task() -> bool:
    return True


@task(log_prints=True, task_run_name=generate_task_run_name)
def task_1(input: dict) -> dict:
    input_val = input['val']
    input['val'] = input_val + 1
    print(f'task_1 - input["val"]:  {input_val}, output:  {input}')

    sleep(randint(5,20))
    return input


@task(log_prints=True, task_run_name=generate_task_run_name)
def task_2(input: dict) -> dict:
    input_val = input['val']
    input['val'] = input_val * 10
    print(f'task_2 - input["val"]:  {input_val}, output:  {input}')

    sleep(randint(5,20))
    return input


@task(log_prints=True, task_run_name=generate_task_run_name)
def task_3(input: dict) -> None:
    input_val = input['val']
    print(f'task_3 - input["val"]: {input_val}')

    sleep(randint(5,20))
    return


@flow
def my_flow() -> None:
    pre_task()
    inputs: list = [
        {'val': 1, 'something_else': True}
        ,{'val': 2, 'something_else': True}
        ,{'val': 3, 'something_else': True}
        ,{'val': 4, 'something_else': True}
        ,{'val': 5, 'something_else': True}
        ,{'val': 6, 'something_else': True}
        ,{'val': 7, 'something_else': True}
        ,{'val': 8, 'something_else': True}
    ]

    result_1: dict = task_1.map(input=inputs)
    result_2: dict = task_2.map(input=result_1)
    final_result = task_3.map(input=result_2)

    final_result.wait()


if __name__ == '__main__':
    my_flow()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
2 participants