Skip to content

Commit

Permalink
Add parent flow run and deployment ids to runtime (#10204)
Browse files Browse the repository at this point in the history
  • Loading branch information
zangell44 authored Jul 13, 2023
1 parent d5b14b8 commit eb096aa
Show file tree
Hide file tree
Showing 2 changed files with 261 additions and 2 deletions.
50 changes: 49 additions & 1 deletion src/prefect/runtime/flow_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,16 @@
from prefect.client.orchestration import get_client
from prefect.context import FlowRunContext, TaskRunContext

__all__ = ["id", "tags", "scheduled_start_time", "name", "flow_name", "parameters"]
__all__ = [
"id",
"tags",
"scheduled_start_time",
"name",
"flow_name",
"parameters",
"parent_flow_run_id",
"parent_deployment_id",
]


type_cast = {
Expand Down Expand Up @@ -83,6 +92,11 @@ async def _get_flow_run(flow_run_id):
return await client.read_flow_run(flow_run_id)


async def _get_task_run(task_run_id):
async with get_client() as client:
return await client.read_task_run(task_run_id)


async def _get_flow_from_run(flow_run_id):
async with get_client() as client:
flow_run = await client.read_flow_run(flow_run_id)
Expand Down Expand Up @@ -176,11 +190,45 @@ def get_parameters() -> Dict[str, Any]:
return {}


def get_parent_flow_run_id() -> Optional[str]:
flow_run_ctx = FlowRunContext.get()
run_id = get_id()
if flow_run_ctx is not None:
parent_task_run_id = flow_run_ctx.flow_run.parent_task_run_id
elif run_id is not None:
flow_run = from_sync.call_soon_in_loop_thread(
create_call(_get_flow_run, run_id)
).result()
parent_task_run_id = flow_run.parent_task_run_id
else:
parent_task_run_id = None

if parent_task_run_id is not None:
parent_task_run = from_sync.call_soon_in_loop_thread(
create_call(_get_task_run, parent_task_run_id)
).result()
return parent_task_run.flow_run_id
return None


def get_parent_deployment_id() -> Dict[str, Any]:
parent_flow_run_id = get_parent_flow_run_id()
if parent_flow_run_id is None:
return None

parent_flow_run = from_sync.call_soon_in_loop_thread(
create_call(_get_flow_run, parent_flow_run_id)
).result()
return parent_flow_run.deployment_id if parent_flow_run else None


FIELDS = {
"id": get_id,
"tags": get_tags,
"scheduled_start_time": get_scheduled_start_time,
"name": get_name,
"flow_name": get_flow_name,
"parameters": get_parameters,
"parent_flow_run_id": get_parent_flow_run_id,
"parent_deployment_id": get_parent_deployment_id,
}
213 changes: 212 additions & 1 deletion tests/runtime/test_flow_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pendulum
import pytest

from prefect import flow, states, tags
from prefect import flow, states, tags, task
from prefect.client.schemas import FlowRun, TaskRun
from prefect.context import FlowRunContext, TaskRunContext
from prefect.flows import Flow
Expand Down Expand Up @@ -276,3 +276,214 @@ def my_flow(x):

monkeypatch.setenv(name="PREFECT__FLOW_RUN_ID", value=flow_run_id)
assert flow_run.parameters == {"x": {"y": 1}}


class TestParentFlowRunId:
async def test_parent_flow_run_id_is_attribute(self):
assert "parent_flow_run_id" in dir(flow_run)

async def test_parent_flow_run_id_is_empty_when_not_set(self):
assert flow_run.parent_flow_run_id is None

async def test_parent_flow_run_id_returns_parent_flow_run_id_when_present_dynamically(
self, prefect_client
):
assert flow_run.parent_flow_run_id is None

with FlowRunContext.construct(
flow_run=FlowRun.construct(parent_task_run_id=None),
flow=Flow(fn=lambda: None, name="foo"),
):
assert flow_run.parent_flow_run_id is None

parent_flow_run = await prefect_client.create_flow_run(
flow=Flow(fn=lambda: None, name="foo2"), parameters={"x": "foo", "y": "bar"}
)

@task
def foo():
return 1

parent_task_run = await prefect_client.create_task_run(
task=foo, dynamic_key="1", flow_run_id=parent_flow_run.id
)

with FlowRunContext.construct(
flow_run=FlowRun.construct(parent_task_run_id=parent_task_run.id),
flow=Flow(fn=lambda: None, name="foo3"),
):
assert (
flow_run.parent_flow_run_id
== parent_flow_run.id
== parent_task_run.flow_run_id
)

assert flow_run.parent_flow_run_id is None

async def test_parent_flow_run_id_pulls_from_api_when_needed(
self, monkeypatch, prefect_client
):
assert flow_run.parent_flow_run_id is None

parent_flow_run = await prefect_client.create_flow_run(
flow=Flow(fn=lambda: None, name="parent"),
parameters={"x": "foo", "y": "bar"},
)

@task
def foo():
return 1

parent_task_run = await prefect_client.create_task_run(
task=foo, dynamic_key="1", flow_run_id=parent_flow_run.id
)

child_flow_run = await prefect_client.create_flow_run(
flow=Flow(fn=lambda: None, name="child"),
parameters={"x": "foo", "y": "bar"},
parent_task_run_id=parent_task_run.id,
)

monkeypatch.setenv(name="PREFECT__FLOW_RUN_ID", value=str(child_flow_run.id))
assert (
flow_run.parent_flow_run_id
== parent_flow_run.id
== parent_task_run.flow_run_id
)

monkeypatch.setenv(name="PREFECT__FLOW_RUN_ID", value=str(parent_flow_run.id))
assert flow_run.parent_flow_run_id is None


class TestParentDeploymentId:
async def test_parent_deployment_id_is_attribute(self):
assert "parent_deployment_id" in dir(flow_run)

async def test_parent_deployment_id_is_empty_when_not_set(self):
assert flow_run.parent_deployment_id is None

async def test_parent_deployment_id_returns_parent_deployment_id_when_present_dynamically(
self, prefect_client
):
assert flow_run.parent_deployment_id is None

@flow
def parent():
return 1

@task
def foo():
return 1

parent_flow_id = await prefect_client.create_flow(parent)

# Parent flow run that does not have a deployment
parent_flow_run_no_deployment = await prefect_client.create_flow_run(
flow=parent,
)
parent_task_run_no_deployment = await prefect_client.create_task_run(
task=foo, dynamic_key="1", flow_run_id=parent_flow_run_no_deployment.id
)
with FlowRunContext.construct(
flow_run=FlowRun.construct(
parent_task_run_id=parent_task_run_no_deployment.id
),
flow=Flow(fn=lambda: None, name="child-flow-no-deployment"),
):
assert flow_run.parent_deployment_id is None

# Parent flow run that does have a deployment
parent_flow_deployment_id = await prefect_client.create_deployment(
flow_id=parent_flow_id,
name="example",
)
parent_flow_run_with_deployment = (
await prefect_client.create_flow_run_from_deployment(
deployment_id=parent_flow_deployment_id,
)
)
parent_task_run_with_deployment = await prefect_client.create_task_run(
task=foo, dynamic_key="1", flow_run_id=parent_flow_run_with_deployment.id
)
with FlowRunContext.construct(
flow_run=FlowRun.construct(
parent_task_run_id=parent_task_run_with_deployment.id
),
flow=Flow(fn=lambda: None, name="child-flow-with-parent-deployment"),
):
assert flow_run.parent_deployment_id == parent_flow_deployment_id

# No parent flow run
with FlowRunContext.construct(
flow_run=FlowRun.construct(parent_task_run_id=None),
flow=Flow(fn=lambda: None, name="child-flow-no-parent-task-run"),
):
assert flow_run.parent_deployment_id is None

async def test_parent_deployment_id_pulls_from_api_when_needed(
self, monkeypatch, prefect_client
):
assert flow_run.parent_deployment_id is None

@flow
def parent():
return 1

@task
def foo():
return 1

parent_flow_id = await prefect_client.create_flow(parent)

# Parent flow run that does not have a deployment
parent_flow_run_no_deployment = await prefect_client.create_flow_run(
flow=parent,
)

parent_task_run_no_deployment = await prefect_client.create_task_run(
task=foo, dynamic_key="1", flow_run_id=parent_flow_run_no_deployment.id
)

child_flow_run_no_deployment = await prefect_client.create_flow_run(
flow=Flow(fn=lambda: None, name="child-no-deploy"),
parameters={"x": "foo", "y": "bar"},
parent_task_run_id=parent_task_run_no_deployment.id,
)

monkeypatch.setenv(
name="PREFECT__FLOW_RUN_ID", value=str(child_flow_run_no_deployment.id)
)
assert flow_run.parent_deployment_id is None

# Parent flow run that does have a deployment
parent_flow_deployment_id = await prefect_client.create_deployment(
flow_id=parent_flow_id,
name="example",
)

parent_flow_run_with_deployment = (
await prefect_client.create_flow_run_from_deployment(
deployment_id=parent_flow_deployment_id,
)
)

parent_task_run_with_deployment = await prefect_client.create_task_run(
task=foo, dynamic_key="1", flow_run_id=parent_flow_run_with_deployment.id
)

child_flow_run_with_deployment = await prefect_client.create_flow_run(
flow=Flow(fn=lambda: None, name="child-deploy"),
parameters={"x": "foo", "y": "bar"},
parent_task_run_id=parent_task_run_with_deployment.id,
)

monkeypatch.setenv(
name="PREFECT__FLOW_RUN_ID", value=str(child_flow_run_with_deployment.id)
)
assert flow_run.parent_deployment_id == parent_flow_deployment_id

# No parent flow run
monkeypatch.setenv(
name="PREFECT__FLOW_RUN_ID", value=str(parent_flow_run_no_deployment.id)
)
assert flow_run.parent_deployment_id is None

0 comments on commit eb096aa

Please sign in to comment.