diff --git a/python_modules/dagster/dagster/_daemon/asset_daemon.py b/python_modules/dagster/dagster/_daemon/asset_daemon.py index dd9f886d72c56..de6d128dab57c 100644 --- a/python_modules/dagster/dagster/_daemon/asset_daemon.py +++ b/python_modules/dagster/dagster/_daemon/asset_daemon.py @@ -330,10 +330,6 @@ def write(self) -> None: class AssetDaemon(DagsterDaemon): def __init__(self, settings: Mapping[str, Any], pre_sensor_interval_seconds: int): - self._initialized_evaluation_id = False - self._evaluation_id_lock = threading.Lock() - self._next_evaluation_id = None - self._pre_sensor_interval_seconds = pre_sensor_interval_seconds self._last_pre_sensor_submit_time = None @@ -360,46 +356,6 @@ def _get_print_sensor_name(self, sensor: Optional[RemoteSensor]) -> str: ) return f" for {sensor.name} in {repo_name}" - def _initialize_evaluation_id( - self, - instance: DagsterInstance, - ): - # Find the largest stored evaluation ID across all auto-materialize cursor - # to initialize the thread-safe evaluation ID counter - with self._evaluation_id_lock: - sensor_states = check.not_none(instance.schedule_storage).all_instigator_state( - instigator_type=InstigatorType.SENSOR - ) - - self._next_evaluation_id = 0 - for sensor_state in sensor_states: - if not ( - sensor_state.sensor_instigator_data - and sensor_state.sensor_instigator_data.sensor_type - and sensor_state.sensor_instigator_data.sensor_type.is_handled_by_asset_daemon - ): - continue - - compressed_cursor = sensor_state.sensor_instigator_data.cursor - if compressed_cursor: - stored_evaluation_id = asset_daemon_cursor_from_instigator_serialized_cursor( - compressed_cursor, None - ).evaluation_id - self._next_evaluation_id = max(self._next_evaluation_id, stored_evaluation_id) - - stored_cursor = _get_pre_sensor_auto_materialize_cursor(instance, None) - self._next_evaluation_id = max(self._next_evaluation_id, stored_cursor.evaluation_id) - - self._initialized_evaluation_id = True - - def _get_next_evaluation_id(self): - # Thread-safe way to generate a new evaluation ID across multiple - # workers running asset policy sensors at once - with self._evaluation_id_lock: - check.invariant(self._initialized_evaluation_id) - self._next_evaluation_id = self._next_evaluation_id + 1 - return self._next_evaluation_id - def core_loop( self, workspace_process_context: IWorkspaceProcessContext, @@ -471,8 +427,6 @@ def _run_iteration_impl( workspace = workspace_process_context.create_request_context() - if not self._initialized_evaluation_id: - self._initialize_evaluation_id(instance) sensors_and_repos: Sequence[Tuple[Optional[RemoteSensor], Optional[RemoteRepository]]] = [] if use_auto_materialize_sensors: @@ -798,21 +752,29 @@ def _process_auto_materialize_tick_generator( latest_tick = ticks[0] if ticks else None max_retries = instance.auto_materialize_max_tick_retries + self._logger.error(f"_hi{instigator_name}") # Determine if the most recent tick requires retrying retry_tick: Optional[InstigatorTick] = None - + override_evaluation_id: Optional[int] = None if latest_tick: + can_resume = ( + get_current_timestamp() - latest_tick.timestamp + ) <= MAX_TIME_TO_RESUME_TICK_SECONDS + previous_cursor_written = ( + latest_tick.tick_data.auto_materialize_evaluation_id + == stored_cursor.evaluation_id + ) + if can_resume and not previous_cursor_written: + # if the tick failed before writing a cursor, we don't want to advance the + # evaluation id yet + override_evaluation_id = latest_tick.tick_data.auto_materialize_evaluation_id + # If the previous tick matches the stored cursor's evaluation ID, check if it failed # or crashed partway through execution and needs to be resumed # Don't resume very old ticks though in case the daemon crashed for a long time and # then restarted - if ( - get_current_timestamp() - latest_tick.timestamp - <= MAX_TIME_TO_RESUME_TICK_SECONDS - and latest_tick.tick_data.auto_materialize_evaluation_id - == stored_cursor.evaluation_id - ): + if can_resume and previous_cursor_written: if latest_tick.status == TickStatus.STARTED: self._logger.warn( f"Tick for evaluation {stored_cursor.evaluation_id}{print_group_name} was interrupted part-way through, resuming" @@ -831,8 +793,9 @@ def _process_auto_materialize_tick_generator( error=None, timestamp=evaluation_time.timestamp(), end_timestamp=None, - ), + ) ) + # otherwise, tick completed normally, no need to do anything else: # (The evaluation IDs not matching indicates that the tick failed or crashed before # the cursor could be written, so no runs have been launched and it's safe to @@ -849,10 +812,6 @@ def _process_auto_materialize_tick_generator( if retry_tick: tick = retry_tick else: - # Evaluation ID will always be monotonically increasing, but will not always - # be auto-incrementing by 1 once there are multiple AMP evaluations happening in - # parallel - next_evaluation_id = self._get_next_evaluation_id() tick = instance.create_tick( TickData( instigator_origin_id=instigator_origin_id, @@ -863,9 +822,18 @@ def _process_auto_materialize_tick_generator( status=TickStatus.STARTED, timestamp=evaluation_time.timestamp(), selector_id=instigator_selector_id, - auto_materialize_evaluation_id=next_evaluation_id, + # we base new auto_materialize_evaluation_ids on the tick id, which we need to + # get from the response + auto_materialize_evaluation_id=None, + ) + ) + # set the new tick_id as the auto_materialize_evaluation_id + tick = tick._replace( + tick_data=tick.tick_data._replace( + auto_materialize_evaluation_id=override_evaluation_id or tick.tick_id ) ) + instance.update_tick(tick) with AutoMaterializeLaunchContext( tick, diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_asset_daemon.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_asset_daemon.py index b285e48e70ac7..0dd874cfa7370 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_asset_daemon.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_asset_daemon.py @@ -45,7 +45,7 @@ set_auto_materialize_paused, ) from dagster._serdes.serdes import deserialize_value, serialize_value -from dagster._time import get_current_datetime +from dagster._time import get_current_datetime, get_current_timestamp from dagster_tests.definitions_tests.declarative_automation_tests.legacy_tests.updated_scenarios.basic_scenarios import ( basic_scenarios, @@ -449,7 +449,19 @@ def test_auto_materialize_sensor_no_transition(): def test_auto_materialize_sensor_transition(): with get_daemon_instance(paused=False) as instance: # Have been using global AMP, so there is a cursor - pre_sensor_evaluation_id = 12345 + pre_sensor_evaluation_id = 4 + for _ in range(pre_sensor_evaluation_id): + # create junk ticks so that the next tick id will be 4 + instance.create_tick( + TickData( + instigator_origin_id="", + instigator_name="", + instigator_type=InstigatorType.SCHEDULE, + status=TickStatus.SUCCESS, + timestamp=get_current_timestamp(), + run_ids=[], + ) + ) assert not get_has_migrated_to_sensors(instance) @@ -578,13 +590,15 @@ def test_auto_materialize_sensor_name_transition() -> None: # skip over the old state for the old name if sensor_state.instigator_name == "default_auto_materialize_sensor": continue - # ensure that we're properly accounting for the old cursor information + # we do not account for the old cursor as it is assumed that the current + # tick id will be strictly larger than the current asset daemon cursor + # value in the real world (as each evaluation creates a new tick) assert ( asset_daemon_cursor_from_instigator_serialized_cursor( cast(SensorInstigatorData, sensor_state.instigator_data).cursor, None, ).evaluation_id - > 2 + > 0 # real world should be larger ) @@ -600,7 +614,19 @@ def test_auto_materialize_sensor_ticks(num_threads): }, ) as instance: with _get_threadpool_executor(instance) as threadpool_executor: - pre_sensor_evaluation_id = 12345 + pre_sensor_evaluation_id = 3 + for _ in range(pre_sensor_evaluation_id): + # create junk ticks so that the next tick id will be 4 + instance.create_tick( + TickData( + instigator_origin_id="", + instigator_name="", + instigator_type=InstigatorType.SCHEDULE, + status=TickStatus.SUCCESS, + timestamp=get_current_timestamp(), + run_ids=[], + ) + ) instance.daemon_cursor_storage.set_cursor_values( { diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_asset_daemon_failure_recovery.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_asset_daemon_failure_recovery.py index e8fe8903fc8e1..c60e32abe1e8c 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_asset_daemon_failure_recovery.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_asset_daemon_failure_recovery.py @@ -342,7 +342,9 @@ def test_error_loop_after_cursor_written(daemon_not_paused_instance, crash_locat assert ticks[0].status == TickStatus.FAILURE assert ticks[0].timestamp == test_time.timestamp() assert ticks[0].tick_data.end_timestamp == test_time.timestamp() - assert ticks[0].tick_data.auto_materialize_evaluation_id == 2 # advances + assert ( + ticks[0].tick_data.auto_materialize_evaluation_id == 5 + ) # advances, skipping a few numbers assert "Oops new tick" in str(ticks[0].tick_data.error) @@ -370,7 +372,7 @@ def test_error_loop_after_cursor_written(daemon_not_paused_instance, crash_locat assert ticks[0].status != TickStatus.FAILURE assert ticks[0].timestamp == test_time.timestamp() assert ticks[0].tick_data.end_timestamp == test_time.timestamp() - assert ticks[0].tick_data.auto_materialize_evaluation_id == 2 # finishes + assert ticks[0].tick_data.auto_materialize_evaluation_id == 5 # finishes spawn_ctx = multiprocessing.get_context("spawn")