-
Is there any way to make the following situation work with Prefect v2 ? # sample project
# A: collect datas from some files and save them to something.
from prefect import flow
@flow(name="A")
def collect_and_save():
data_a = fetch_some_data()
data_b = fetch_other_data()
data_save(data_a, data_b) # other project
# B: detects that A is complete and performs an analysis from that file.
from prefect import flow
@flow(name="B")
def analyse_datas():
data_a, data_b = get_datas()
data_c = some_analyse(data_a, data_b)
write_something(data_c)
# with Flow("A").when_completed():
# analyse_datas() For reference, the following page was explained in Prefect v1 . Exactly what we want to do. It doesn't matter if it's in the form of stating the dependencies in the code or specifying them in the web UI. Thanks in advance. |
Beta Was this translation helpful? Give feedback.
Replies: 4 comments 4 replies
-
https://docs.prefect.io/concepts/flows/#composing-flows try to use wait_for parameter in Prefect 2, I haven't test though |
Beta Was this translation helpful? Give feedback.
-
It seems like you can use the recently introduced automations. In the cloud UI, on the left menu, there is an automations tab where you can build a config that makes one flow trigger when another flow hits a state of complete. |
Beta Was this translation helpful? Give feedback.
-
Disregarding "We are assuming a case where the function entity cannot be referred to in the first place" for a moment, I think this works:
|
Beta Was this translation helpful? Give feedback.
-
I was able to forcefully implement this using the Orion API. import asyncio
from time import sleep
from datetime import datetime
from prefect import flow
from prefect.client.orion import get_client
from prefect.orion.schemas.filters import (
FlowRunFilterState,
FlowRunFilterStateName ,
FlowRunFilter,
FlowRunFilterStartTime,
DeploymentFilter,
DeploymentFilterName)
async def wait_flow_end(
deploy_name: str,
start_time = datetime.now(),
max_loop_time: int = 3600,
retry_span: int = 10
):
current_t = 0
client = get_client()
while current_t < max_loop_time:
states = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
state=FlowRunFilterState(
type=FlowRunFilterStateName(any_=["COMPLETED"])
),
start_time=FlowRunFilterStartTime(
after_=start_time
)
),
deployment_filter=DeploymentFilter(
name=DeploymentFilterName(any_=[deploy_name])
),
limit=1
)
if(len(states) > 0):
print(f"flow-run completed!")
return True
current_t += retry_span
print(f"wait flow-run complete of deploy:{deploy_name} ({current_t}/{max_loop_time} sec)")
sleep(retry_span)
return False
@flow(name="test_B")
async def testFlow_B():
print("Hello, Flow B!")
@flow
async def testFlow_runB_afterA():
if(await wait_flow_end("test_A")):
await testFlow_B()
if __name__ == "__main__":
asyncio.run( testFlow_runB_afterA() ) The output is then as follows. (I have an Orion server running on the same host, with a deployment named
|
Beta Was this translation helpful? Give feedback.
It seems like you can use the recently introduced automations. In the cloud UI, on the left menu, there is an automations tab where you can build a config that makes one flow trigger when another flow hits a state of complete.