Skip to content

Commit

Permalink
Deep merge env vars (#14932)
Browse files Browse the repository at this point in the history
  • Loading branch information
cicdw authored Aug 14, 2024
1 parent 0117fe5 commit a257267
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 2 deletions.
7 changes: 6 additions & 1 deletion src/prefect/workers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,12 @@ async def _get_configuration(

deployment_vars = deployment.job_variables or {}
flow_run_vars = flow_run.job_variables or {}
job_variables = {**deployment_vars, **flow_run_vars}
job_variables = {**deployment_vars}

# merge environment variables carefully, otherwise full override
if isinstance(job_variables.get("env"), dict):
job_variables["env"].update(flow_run_vars.pop("env", {}))
job_variables.update(flow_run_vars)

configuration = await self.job_configuration.from_template_and_values(
base_job_template=self._work_pool.base_job_template,
Expand Down
33 changes: 32 additions & 1 deletion tests/workers/test_base_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
)
from prefect.flows import flow
from prefect.server import models
from prefect.server.schemas.core import Flow, WorkPool
from prefect.server.schemas.core import Deployment, Flow, WorkPool
from prefect.server.schemas.responses import DeploymentResponse
from prefect.settings import (
PREFECT_API_URL,
Expand Down Expand Up @@ -1644,3 +1644,34 @@ def create_run_with_deployment(state):

worker.run.assert_awaited_once()
assert worker.run.call_args[1]["flow_run"].id == flow_run.id


async def test_env_merge_logic_is_deep(prefect_client, session, flow):
deployment = await models.deployments.create_deployment(
session=session,
deployment=Deployment(
name="env-testing",
tags=["test"],
flow_id=flow.id,
path="./subdir",
entrypoint="/file.py:flow",
parameter_openapi_schema={},
job_variables={"env": {"test-var": "foo"}},
),
)
await session.commit()

flow_run = await prefect_client.create_flow_run_from_deployment(
deployment.id,
state=Pending(),
job_variables={"env": {"another-var": "boo"}},
)
async with WorkerTestImpl(
name="test",
work_pool_name="test-work-pool",
) as worker:
await worker.sync_with_backend()
config = await worker._get_configuration(flow_run)

assert config.env["test-var"] == "foo"
assert config.env["another-var"] == "boo"

0 comments on commit a257267

Please sign in to comment.