Skip to content

Commit

Permalink
First order deep merge of flow run overrides with deployment overrides (
Browse files Browse the repository at this point in the history
  • Loading branch information
cicdw authored Aug 14, 2024
1 parent 318eaf2 commit 51c3f29
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 2 deletions.
7 changes: 6 additions & 1 deletion src/prefect/workers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,12 @@ async def _get_configuration(

deployment_vars = deployment.job_variables or {}
flow_run_vars = flow_run.job_variables or {}
job_variables = {**deployment_vars, **flow_run_vars}
job_variables = {**deployment_vars}

# merge environment variables carefully, otherwise full override
if isinstance(job_variables.get("env"), dict):
job_variables["env"].update(flow_run_vars.pop("env", {}))
job_variables.update(flow_run_vars)

configuration = await self.job_configuration.from_template_and_values(
base_job_template=self._work_pool.base_job_template,
Expand Down
34 changes: 33 additions & 1 deletion tests/workers/test_base_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
)
from prefect.flows import flow
from prefect.server import models
from prefect.server.schemas.core import Flow
from prefect.server.schemas.core import Deployment, Flow
from prefect.server.schemas.responses import DeploymentResponse
from prefect.server.schemas.states import StateType
from prefect.settings import (
Expand Down Expand Up @@ -2073,3 +2073,35 @@ async def test_worker_last_polled_health_check(

# cleanup mock
pendulum.set_test_now()


async def test_env_merge_logic_is_deep(prefect_client, session, flow):
deployment = await models.deployments.create_deployment(
session=session,
deployment=Deployment(
name="env-testing",
tags=["test"],
flow_id=flow.id,
schedule=None,
path="./subdir",
entrypoint="/file.py:flow",
parameter_openapi_schema=None,
job_variables={"env": {"test-var": "foo"}},
),
)
await session.commit()

flow_run = await prefect_client.create_flow_run_from_deployment(
deployment.id,
state=Pending(),
job_variables={"env": {"another-var": "boo"}},
)
async with WorkerTestImpl(
name="test",
work_pool_name="test-work-pool",
) as worker:
await worker.sync_with_backend()
config = await worker._get_configuration(flow_run)

assert config.env["test-var"] == "foo"
assert config.env["another-var"] == "boo"

0 comments on commit 51c3f29

Please sign in to comment.