diff --git a/src/prefect/workers/base.py b/src/prefect/workers/base.py index fdce809e30ae..b3ee5a1f4c3a 100644 --- a/src/prefect/workers/base.py +++ b/src/prefect/workers/base.py @@ -930,7 +930,12 @@ async def _get_configuration( deployment_vars = deployment.job_variables or {} flow_run_vars = flow_run.job_variables or {} - job_variables = {**deployment_vars, **flow_run_vars} + job_variables = {**deployment_vars} + + # merge environment variables carefully, otherwise full override + if isinstance(job_variables.get("env"), dict): + job_variables["env"].update(flow_run_vars.pop("env", {})) + job_variables.update(flow_run_vars) configuration = await self.job_configuration.from_template_and_values( base_job_template=self._work_pool.base_job_template, diff --git a/tests/workers/test_base_worker.py b/tests/workers/test_base_worker.py index 97a1fc9407e6..7a91d6b6b530 100644 --- a/tests/workers/test_base_worker.py +++ b/tests/workers/test_base_worker.py @@ -18,7 +18,7 @@ ) from prefect.flows import flow from prefect.server import models -from prefect.server.schemas.core import Flow, WorkPool +from prefect.server.schemas.core import Deployment, Flow, WorkPool from prefect.server.schemas.responses import DeploymentResponse from prefect.settings import ( PREFECT_API_URL, @@ -1644,3 +1644,34 @@ def create_run_with_deployment(state): worker.run.assert_awaited_once() assert worker.run.call_args[1]["flow_run"].id == flow_run.id + + +async def test_env_merge_logic_is_deep(prefect_client, session, flow): + deployment = await models.deployments.create_deployment( + session=session, + deployment=Deployment( + name="env-testing", + tags=["test"], + flow_id=flow.id, + path="./subdir", + entrypoint="/file.py:flow", + parameter_openapi_schema={}, + job_variables={"env": {"test-var": "foo"}}, + ), + ) + await session.commit() + + flow_run = await prefect_client.create_flow_run_from_deployment( + deployment.id, + state=Pending(), + job_variables={"env": {"another-var": "boo"}}, + ) + async with WorkerTestImpl( + name="test", + work_pool_name="test-work-pool", + ) as worker: + await worker.sync_with_backend() + config = await worker._get_configuration(flow_run) + + assert config.env["test-var"] == "foo" + assert config.env["another-var"] == "boo"