Replies: 2 comments 1 reply
-
Hi @ame307 - Prefect 1.0 did not allow for dynamic flow definitions, but I think I understand what you're trying to get at. What you are calling a "static definition" in 2.0 is really just a statically importable function with a static signature - the contents of that function can change as often as you'd like without having to recreate your deployment object. In your case, my guess is that the definition of this function would change depending on the parameters you pass (and in 1.0, I'm guessing you registered new flows every time you wanted to run something with a modification). In Prefect 2.0 the only thing that need to be statically defined are:
In Prefect 1.0, you were required to define the full execution DAG for your Flow prior to running it even once - that is not true in 2.0, with each run of your function, new tasks can arrive etc. and you are not required to prepare a full DAG of your execution, Prefect will do that for you automatically with each run. To try and emphasize this point, your function could look like this: import random
from prefect import flow, task
from prefect.runtime import flow_run
@task(task_run_name=f"Random number between [0, {flow_run.parameters.get('max')}]")
def produce_random_number(max: N):
return random.randint(0, N)
@flow
def random_flow(max: int, loops: int):
for _ in range(loops):
produce_random_number(max) You are correct that we no longer store pickled objects but instead use importable functions. This actually increases the dynamic ability of your flow definitions, because the script in which the function is defined is re-run with each deployment run, allowing you to modify things on the fly (in the above example, the task run name would be static if using pickle but is dynamically set with each run using the file-based approach). If you can create a reproducible issue with your cynthonized file setup we'll definitely investigate! |
Beta Was this translation helpful? Give feedback.
-
Hi @cicdw! Thank you for the detailed answer; I truly appreciate it. Some of my statements have been misunderstood. Let me try to clarify them. What I call "static definition" is exactly what you also mention: the statically importable function (or Flow instance, which can be initialized from the Flow class as the decorator is casting the function into this class instance as well). My problem with this approach is that I cannot wrap my flow function or instance in a class. Why is it a problem? As I mentioned previously, I was able to dynamically create my flow, even within a class. I could do the following in Prefect 1.0: from prefect import Flow, Client, Task, Parameter
from my_tasks import MyTask1, MyTask2
class MyApp():
def __init__(self):
self.config = self._load_config()
def start(self):
flow = MyFlowBuilder.get_flow(
arg1=self.config["arg1"],
arg2=self.config["arg2"],
executor_config=self.config["executor_config"],
storage_config=self.config["storage_config"],
run_config=self.config["run_config"],
)
client = Client(api_server=self.config["api_server"])
flow_id = client.register(
flow=flow,
project_name=self.config["project_name"],
set_schedule_active=self.config["set_schedule_active"]
)
client.create_flow_run(
flow_id=flow_id,
context=self.config["context"],
parameters=self.config["flow_params"],
run_config=self.config["run_config"]
)
def _load_config(self):
# some logic here
class MyFlowBuilder():
def __init__(
self,
arg1,
arg2,
**kwargs
):
self.arg1 = arg1
self.arg2 = arg2
self.kwargs = kwargs
def _get_flow(
self,
executor_config,
storage_config,
run_config
):
flow = Flow("my-flow")
flow.executor = self._init_executor(executor_config)
flow.storage = self._init_storage(storage_config)
flow.run_config = self._init_run_config(run_config)
flow.add_task(Parameter("flow_arg1"))
flow.add_task(Parameter("flow_arg2"))
flow.add_task(MyTask1(self.arg1))
flow.add_task(MyTask2(self.arg2))
...
@classmethod
def get_flow(
cls,
arg1,
arg2,
executor_config,
storage_config,
run_config,
**kwargs
):
flow_builder = cls(
arg1=arg1,
arg2=arg2,
**kwargs
)
return flow_builder._get_flow(
executor_config=executor_config,
storage_config=storage_config,
run_config=run_config
)
def _init_executor(self, executor_config):
# some logic here
def _init_storage(self, storage_config):
# some logic here
def _init_run_config(self, run_config):
# some logic here
if __name__ == '__main__':
MyApp().start() In the example you can see, I am using members of the class to initialize my tasks. It means that the created flow object contains these values statically, but I am providing this information dynamically during the flow creation. I understand that, with Prefect 2.0, I could reach the same result if I provided these values as flow parameters. Saying deployment time was misleading from my side because I was referring to the application deployment when I am creating the Kubernetes pod where my code will run, and not to the flow run. So my problem is that these values, Another problem I have with this is that let's assume I need a value in the flow that is determined during the flow deployment, and I have a thick logic that is calculating this value. In Prefect 2.0 if I wanted to provide this value to the flow, I have to decide whether I include this logic as part of the flow (what I don't want) or I have to pass it again each time to the flow as a parameter. I could modify my previous example like this: ....
class MyFlowBuilder():
def __init__(
self,
arg1,
arg2,
**kwargs
):
self.arg1 = arg1
self.arg2 = arg2
self.kwargs = kwargs
self.value = self._get_my_value_by_complex_calculation()
def _get_my_value_by_complex_calculation(self):
# some logic here
def _get_flow(
self,
executor_config,
storage_config,
run_config
):
flow = Flow("my-flow")
....
flow.add_task(MyTask2(self.arg2, self.value))
.... Another concern in Prefect 2.0, I don't see yet how I should provide parameters in the following case, for example: def get_my_task_runner(params):
pass
# some logic here
@flow(
name="my-flow",
task_runner=get_my_task_runner(params)
)
def my_flow_definition():
pass How can I provide the Regarding the cythonization problem, I am getting the following error whenever the file where I have the (prefect==2.8.6)
Thank you for the answer in advance! |
Beta Was this translation helpful? Give feedback.
-
Hi,
Previously, I had a working environment with Prefect 1.0, and recently I started to explore the Prefect 2.0 solution. For me, it looks like the new solution is way more rigid than the previous one, and I would be happy for some clarification.
In 1.0, I was able to create flow fully dynamically. It was possible to create an empty flow, add tasks, set configurations later on. It was very flexible. In 2.0, it looks like this is not possible anymore. More than that, it looks like I must have a static flow definition at deploy time as the flow definition must be importable.
For me, it is a huge step back. My current understanding is right, or am I missing something? If we take the following example:
When I used a similar code, I thought that the
flow=my_flow
and the function or object I passed as a reference withentrypoint
were going to be used for different purposes. After a look at the code, I realized that is not the case. Thebuild_from_flow
is just simply constructing aDeployment
which does not need any flow instances, only some details about the flow. The real flow definition will be interpreted via the reference passed through theentrypoint
. If I am right, it means that I have no option to initiate that flow instance by parameters, but I must have a static flow definition that can be imported as a module by the Prefect. Of course, I can pass through flow parameters all the dynamic values, but it is still a step back. Another problem is, that I won't be able to create thetask_runner
dynamically either.Furthermore, instead of pickeling as it uses
importlib
andinspect
(if I am right, I didn't take a deep look into this part), I am getting errors when I am using cythonized files. It is really bad. Am I missing something, or are my statements right? If it is the latter, I really don't see what the advantages of 2.0 are.Beta Was this translation helpful? Give feedback.
All reactions