From 73cb8803153842b9092e27bb88f3dbc327a74d7e Mon Sep 17 00:00:00 2001 From: Owen Kephart Date: Fri, 18 Oct 2024 15:10:33 -0700 Subject: [PATCH] Fix bug when switching from non-user-code to user-code --- .../dagster/dagster/_daemon/asset_daemon.py | 52 ++++++++-------- .../dagster/dagster/_daemon/sensor.py | 5 -- .../daemon_tests/definitions/simple_defs.py | 12 ++++ .../definitions/simple_non_user_code.py | 17 ++++++ .../definitions/simple_user_code.py | 17 ++++++ .../daemon_tests/test_e2e.py | 59 ++++++++++++++++++- 6 files changed, 127 insertions(+), 35 deletions(-) create mode 100644 python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/simple_defs.py create mode 100644 python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/simple_non_user_code.py create mode 100644 python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/simple_user_code.py diff --git a/python_modules/dagster/dagster/_daemon/asset_daemon.py b/python_modules/dagster/dagster/_daemon/asset_daemon.py index dd9f886d72c56..7f183578c54ac 100644 --- a/python_modules/dagster/dagster/_daemon/asset_daemon.py +++ b/python_modules/dagster/dagster/_daemon/asset_daemon.py @@ -493,16 +493,11 @@ def _run_iteration_impl( if not eligible_sensors_and_repos: return - all_auto_materialize_states = { + all_sensor_states = { sensor_state.selector_id: sensor_state for sensor_state in instance.all_instigator_state( instigator_type=InstigatorType.SENSOR ) - if ( - sensor_state.sensor_instigator_data - and sensor_state.sensor_instigator_data.sensor_type - and sensor_state.sensor_instigator_data.sensor_type.is_handled_by_asset_daemon - ) } if not self._checked_migrations: @@ -517,13 +512,11 @@ def _run_iteration_impl( self._logger.info( "Translating legacy cursor into a new cursor for each new automation policy sensor" ) - all_auto_materialize_states = ( - self._create_initial_sensor_cursors_from_raw_cursor( - instance, - eligible_sensors_and_repos, - all_auto_materialize_states, - pre_sensor_cursor, - ) + all_sensor_states = self._create_initial_sensor_cursors_from_raw_cursor( + instance, + eligible_sensors_and_repos, + all_sensor_states, + pre_sensor_cursor, ) set_has_migrated_to_sensors(instance) @@ -533,8 +526,8 @@ def _run_iteration_impl( self._logger.info( "Renaming any states corresponding to the legacy default name" ) - all_auto_materialize_states = self._copy_default_auto_materialize_sensor_states( - instance, all_auto_materialize_states + all_sensor_states = self._copy_default_auto_materialize_sensor_states( + instance, all_sensor_states ) set_has_migrated_sensor_names(instance) @@ -543,7 +536,7 @@ def _run_iteration_impl( for sensor, repo in eligible_sensors_and_repos: selector_id = sensor.selector_id if sensor.get_current_instigator_state( - all_auto_materialize_states.get(selector_id) + all_sensor_states.get(selector_id) ).is_running: sensors_and_repos.append((sensor, repo)) @@ -554,12 +547,12 @@ def _run_iteration_impl( None, ) # Represents that there's a single set of ticks with no underlying sensor ) - all_auto_materialize_states = {} + all_sensor_states = {} for sensor, repo in sensors_and_repos: if sensor: selector_id = sensor.selector.get_id() - auto_materialize_state = all_auto_materialize_states.get(selector_id) + auto_materialize_state = all_sensor_states.get(selector_id) else: selector_id = None auto_materialize_state = None @@ -617,7 +610,7 @@ def _create_initial_sensor_cursors_from_raw_cursor( self, instance: DagsterInstance, sensors_and_repos: Sequence[Tuple[RemoteSensor, RemoteRepository]], - all_auto_materialize_states: Mapping[str, InstigatorState], + all_sensor_states: Mapping[str, InstigatorState], pre_sensor_cursor: AssetDaemonCursor, ) -> Mapping[str, InstigatorState]: start_status = ( @@ -645,7 +638,7 @@ def _create_initial_sensor_cursors_from_raw_cursor( ), ) - if all_auto_materialize_states.get(sensor.selector_id): + if all_sensor_states.get(sensor.selector_id): instance.update_instigator_state(new_auto_materialize_state) else: instance.add_instigator_state(new_auto_materialize_state) @@ -657,16 +650,21 @@ def _create_initial_sensor_cursors_from_raw_cursor( def _copy_default_auto_materialize_sensor_states( self, instance: DagsterInstance, - all_auto_materialize_states: Mapping[str, InstigatorState], + all_sensor_states: Mapping[str, InstigatorState], ) -> Mapping[str, InstigatorState]: """Searches for sensors named `default_auto_materialize_sensor` and copies their state to a sensor in the same repository named `default_automation_condition_sensor`. """ - result = dict(all_auto_materialize_states) - - for instigator_state in all_auto_materialize_states.values(): - # only migrate instigators with the name "default_auto_materialize_sensor" - if instigator_state.origin.instigator_name != "default_auto_materialize_sensor": + result = dict(all_sensor_states) + + for instigator_state in all_sensor_states.values(): + # only migrate instigators with the name "default_auto_materialize_sensor" and are + # handled by the asset daemon + if instigator_state.origin.instigator_name != "default_auto_materialize_sensor" and ( + instigator_state.sensor_instigator_data + and instigator_state.sensor_instigator_data.sensor_type + and instigator_state.sensor_instigator_data.sensor_type.is_handled_by_asset_daemon + ): continue new_sensor_origin = instigator_state.origin._replace( instigator_name="default_automation_condition_sensor" @@ -679,7 +677,7 @@ def _copy_default_auto_materialize_sensor_states( ) new_sensor_selector_id = new_sensor_origin.get_selector().get_id() result[new_sensor_selector_id] = new_auto_materialize_state - if all_auto_materialize_states.get(new_sensor_selector_id): + if all_sensor_states.get(new_sensor_selector_id): instance.update_instigator_state(new_auto_materialize_state) else: instance.add_instigator_state(new_auto_materialize_state) diff --git a/python_modules/dagster/dagster/_daemon/sensor.py b/python_modules/dagster/dagster/_daemon/sensor.py index d44a54a968348..7881698ddac22 100644 --- a/python_modules/dagster/dagster/_daemon/sensor.py +++ b/python_modules/dagster/dagster/_daemon/sensor.py @@ -371,11 +371,6 @@ def execute_sensor_iteration( all_sensor_states = { sensor_state.selector_id: sensor_state for sensor_state in instance.all_instigator_state(instigator_type=InstigatorType.SENSOR) - if not ( # filter out sensors state handled by asset daemon - sensor_state.sensor_instigator_data - and sensor_state.sensor_instigator_data.sensor_type - and sensor_state.sensor_instigator_data.sensor_type.is_handled_by_asset_daemon - ) } tick_retention_settings = instance.get_tick_retention_settings(InstigatorType.SENSOR) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/simple_defs.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/simple_defs.py new file mode 100644 index 0000000000000..3b9783560da60 --- /dev/null +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/simple_defs.py @@ -0,0 +1,12 @@ +import dagster as dg + + +@dg.asset +def root() -> None: ... + + +@dg.asset(deps=[root], automation_condition=dg.AutomationCondition.eager()) +def downstream() -> None: ... + + +defs = dg.Definitions(assets=[root, downstream]) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/simple_non_user_code.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/simple_non_user_code.py new file mode 100644 index 0000000000000..37c5b11611b21 --- /dev/null +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/simple_non_user_code.py @@ -0,0 +1,17 @@ +import dagster as dg + + +def get_defs() -> dg.Definitions: + from .simple_defs import defs as simple_defs # noqa + + return dg.Definitions( + assets=simple_defs.assets, + sensors=[ + dg.AutomationConditionSensorDefinition( + name="the_sensor", asset_selection="*", user_code=False + ) + ], + ) + + +defs = get_defs() diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/simple_user_code.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/simple_user_code.py new file mode 100644 index 0000000000000..835112cfe15ec --- /dev/null +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/simple_user_code.py @@ -0,0 +1,17 @@ +import dagster as dg + + +def get_defs() -> dg.Definitions: + from .simple_defs import defs as simple_defs # noqa + + return dg.Definitions( + assets=simple_defs.assets, + sensors=[ + dg.AutomationConditionSensorDefinition( + name="the_sensor", asset_selection="*", user_code=True + ) + ], + ) + + +defs = get_defs() diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py index 3afd061be02e5..c0e6bdfa59f45 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py @@ -2,7 +2,7 @@ import os import sys from contextlib import contextmanager -from typing import AbstractSet, Mapping, Sequence, cast +from typing import AbstractSet, Mapping, Optional, Sequence, cast import dagster._check as check import pytest @@ -37,7 +37,9 @@ from dagster._time import get_current_datetime -def get_code_location_origin(filename: str) -> InProcessCodeLocationOrigin: +def get_code_location_origin( + filename: str, location_name: Optional[str] = None +) -> InProcessCodeLocationOrigin: return InProcessCodeLocationOrigin( loadable_target_origin=LoadableTargetOrigin( executable_path=sys.executable, @@ -46,7 +48,7 @@ def get_code_location_origin(filename: str) -> InProcessCodeLocationOrigin: ), working_directory=os.getcwd(), ), - location_name=filename, + location_name=location_name or filename, ) @@ -521,3 +523,54 @@ def test_backfill_with_runs_and_checks() -> None: assert len(backfills) == 0 runs = _get_runs_for_latest_ticks(context) assert len(runs) == 0 + + +def test_toggle_user_code() -> None: + with instance_for_test( + overrides={ + "run_launcher": { + "module": "dagster._core.launcher.sync_in_memory_run_launcher", + "class": "SyncInMemoryRunLauncher", + }, + } + ) as instance, get_threadpool_executor() as executor: + user_code_target = InProcessTestWorkspaceLoadTarget( + [get_code_location_origin("simple_user_code", location_name="simple")] + ) + non_user_code_target = InProcessTestWorkspaceLoadTarget( + [get_code_location_origin("simple_non_user_code", location_name="simple")] + ) + + # start off with non user code target, just do the setup once + with create_test_daemon_workspace_context( + workspace_load_target=non_user_code_target, instance=instance + ) as context: + _setup_instance(context) + + time = get_current_datetime() + # toggle back and forth between target types + for target in [non_user_code_target, user_code_target, non_user_code_target]: + with create_test_daemon_workspace_context( + workspace_load_target=target, instance=instance + ) as context: + time += datetime.timedelta(seconds=35) + with freeze_time(time): + # first tick, nothing happened + _execute_ticks(context, executor) + runs = _get_runs_for_latest_ticks(context) + assert len(runs) == 0 + + time += datetime.timedelta(seconds=35) + with freeze_time(time): + # second tick, root gets updated + instance.report_runless_asset_event(AssetMaterialization("root")) + _execute_ticks(context, executor) + runs = _get_runs_for_latest_ticks(context) + assert runs[0].asset_selection == {AssetKey("downstream")} + + time += datetime.timedelta(seconds=35) + with freeze_time(time): + # third tick, don't kick off again + _execute_ticks(context, executor) + runs = _get_runs_for_latest_ticks(context) + assert len(runs) == 0