Skip to content

Commit

Permalink
[WIP] Update evaluation id logic
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Oct 17, 2024
1 parent fdd6922 commit 4804b8b
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 66 deletions.
85 changes: 26 additions & 59 deletions python_modules/dagster/dagster/_daemon/asset_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,6 @@ def write(self) -> None:

class AssetDaemon(DagsterDaemon):
def __init__(self, settings: Mapping[str, Any], pre_sensor_interval_seconds: int):
self._initialized_evaluation_id = False
self._evaluation_id_lock = threading.Lock()
self._next_evaluation_id = None

self._pre_sensor_interval_seconds = pre_sensor_interval_seconds
self._last_pre_sensor_submit_time = None

Expand All @@ -360,46 +356,6 @@ def _get_print_sensor_name(self, sensor: Optional[RemoteSensor]) -> str:
)
return f" for {sensor.name} in {repo_name}"

def _initialize_evaluation_id(
self,
instance: DagsterInstance,
):
# Find the largest stored evaluation ID across all auto-materialize cursor
# to initialize the thread-safe evaluation ID counter
with self._evaluation_id_lock:
sensor_states = check.not_none(instance.schedule_storage).all_instigator_state(
instigator_type=InstigatorType.SENSOR
)

self._next_evaluation_id = 0
for sensor_state in sensor_states:
if not (
sensor_state.sensor_instigator_data
and sensor_state.sensor_instigator_data.sensor_type
and sensor_state.sensor_instigator_data.sensor_type.is_handled_by_asset_daemon
):
continue

compressed_cursor = sensor_state.sensor_instigator_data.cursor
if compressed_cursor:
stored_evaluation_id = asset_daemon_cursor_from_instigator_serialized_cursor(
compressed_cursor, None
).evaluation_id
self._next_evaluation_id = max(self._next_evaluation_id, stored_evaluation_id)

stored_cursor = _get_pre_sensor_auto_materialize_cursor(instance, None)
self._next_evaluation_id = max(self._next_evaluation_id, stored_cursor.evaluation_id)

self._initialized_evaluation_id = True

def _get_next_evaluation_id(self):
# Thread-safe way to generate a new evaluation ID across multiple
# workers running asset policy sensors at once
with self._evaluation_id_lock:
check.invariant(self._initialized_evaluation_id)
self._next_evaluation_id = self._next_evaluation_id + 1
return self._next_evaluation_id

def core_loop(
self,
workspace_process_context: IWorkspaceProcessContext,
Expand Down Expand Up @@ -471,8 +427,6 @@ def _run_iteration_impl(

workspace = workspace_process_context.create_request_context()

if not self._initialized_evaluation_id:
self._initialize_evaluation_id(instance)
sensors_and_repos: Sequence[Tuple[Optional[RemoteSensor], Optional[RemoteRepository]]] = []

if use_auto_materialize_sensors:
Expand Down Expand Up @@ -801,18 +755,25 @@ def _process_auto_materialize_tick_generator(

# Determine if the most recent tick requires retrying
retry_tick: Optional[InstigatorTick] = None

override_evaluation_id: Optional[int] = None
if latest_tick:
can_resume = (
get_current_timestamp() - latest_tick.timestamp
) <= MAX_TIME_TO_RESUME_TICK_SECONDS
previous_cursor_written = (
latest_tick.tick_data.auto_materialize_evaluation_id
== stored_cursor.evaluation_id
)
if can_resume and not previous_cursor_written:
# if the tick failed before writing a cursor, we don't want to advance the
# evaluation id yet
override_evaluation_id = latest_tick.tick_data.auto_materialize_evaluation_id

# If the previous tick matches the stored cursor's evaluation ID, check if it failed
# or crashed partway through execution and needs to be resumed
# Don't resume very old ticks though in case the daemon crashed for a long time and
# then restarted
if (
get_current_timestamp() - latest_tick.timestamp
<= MAX_TIME_TO_RESUME_TICK_SECONDS
and latest_tick.tick_data.auto_materialize_evaluation_id
== stored_cursor.evaluation_id
):
if can_resume and previous_cursor_written:
if latest_tick.status == TickStatus.STARTED:
self._logger.warn(
f"Tick for evaluation {stored_cursor.evaluation_id}{print_group_name} was interrupted part-way through, resuming"
Expand All @@ -831,8 +792,9 @@ def _process_auto_materialize_tick_generator(
error=None,
timestamp=evaluation_time.timestamp(),
end_timestamp=None,
),
)
)
# otherwise, tick completed normally, no need to do anything
else:
# (The evaluation IDs not matching indicates that the tick failed or crashed before
# the cursor could be written, so no runs have been launched and it's safe to
Expand All @@ -849,10 +811,6 @@ def _process_auto_materialize_tick_generator(
if retry_tick:
tick = retry_tick
else:
# Evaluation ID will always be monotonically increasing, but will not always
# be auto-incrementing by 1 once there are multiple AMP evaluations happening in
# parallel
next_evaluation_id = self._get_next_evaluation_id()
tick = instance.create_tick(
TickData(
instigator_origin_id=instigator_origin_id,
Expand All @@ -863,9 +821,18 @@ def _process_auto_materialize_tick_generator(
status=TickStatus.STARTED,
timestamp=evaluation_time.timestamp(),
selector_id=instigator_selector_id,
auto_materialize_evaluation_id=next_evaluation_id,
# we base new auto_materialize_evaluation_ids on the tick id, which we need to
# get from the response
auto_materialize_evaluation_id=None,
)
)
# set the new tick_id as the auto_materialize_evaluation_id
tick = tick._replace(
tick_data=tick.tick_data._replace(
auto_materialize_evaluation_id=override_evaluation_id or tick.tick_id
)
)
instance.update_tick(tick)

with AutoMaterializeLaunchContext(
tick,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
set_auto_materialize_paused,
)
from dagster._serdes.serdes import deserialize_value, serialize_value
from dagster._time import get_current_datetime
from dagster._time import get_current_datetime, get_current_timestamp

from dagster_tests.definitions_tests.declarative_automation_tests.legacy_tests.updated_scenarios.basic_scenarios import (
basic_scenarios,
Expand Down Expand Up @@ -449,7 +449,19 @@ def test_auto_materialize_sensor_no_transition():
def test_auto_materialize_sensor_transition():
with get_daemon_instance(paused=False) as instance:
# Have been using global AMP, so there is a cursor
pre_sensor_evaluation_id = 12345
pre_sensor_evaluation_id = 4
for _ in range(pre_sensor_evaluation_id):
# create junk ticks so that the next tick id will be 4
instance.create_tick(
TickData(
instigator_origin_id="",
instigator_name="",
instigator_type=InstigatorType.SCHEDULE,
status=TickStatus.SUCCESS,
timestamp=get_current_timestamp(),
run_ids=[],
)
)

assert not get_has_migrated_to_sensors(instance)

Expand Down Expand Up @@ -578,13 +590,15 @@ def test_auto_materialize_sensor_name_transition() -> None:
# skip over the old state for the old name
if sensor_state.instigator_name == "default_auto_materialize_sensor":
continue
# ensure that we're properly accounting for the old cursor information
# we do not account for the old cursor as it is assumed that the current
# tick id will be strictly larger than the current asset daemon cursor
# value in the real world (as each evaluation creates a new tick)
assert (
asset_daemon_cursor_from_instigator_serialized_cursor(
cast(SensorInstigatorData, sensor_state.instigator_data).cursor,
None,
).evaluation_id
> 2
> 0 # real world should be larger
)


Expand All @@ -600,7 +614,19 @@ def test_auto_materialize_sensor_ticks(num_threads):
},
) as instance:
with _get_threadpool_executor(instance) as threadpool_executor:
pre_sensor_evaluation_id = 12345
pre_sensor_evaluation_id = 3
for _ in range(pre_sensor_evaluation_id):
# create junk ticks so that the next tick id will be 4
instance.create_tick(
TickData(
instigator_origin_id="",
instigator_name="",
instigator_type=InstigatorType.SCHEDULE,
status=TickStatus.SUCCESS,
timestamp=get_current_timestamp(),
run_ids=[],
)
)

instance.daemon_cursor_storage.set_cursor_values(
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,9 @@ def test_error_loop_after_cursor_written(daemon_not_paused_instance, crash_locat
assert ticks[0].status == TickStatus.FAILURE
assert ticks[0].timestamp == test_time.timestamp()
assert ticks[0].tick_data.end_timestamp == test_time.timestamp()
assert ticks[0].tick_data.auto_materialize_evaluation_id == 2 # advances
assert (
ticks[0].tick_data.auto_materialize_evaluation_id == 5
) # advances, skipping a few numbers

assert "Oops new tick" in str(ticks[0].tick_data.error)

Expand Down Expand Up @@ -370,7 +372,7 @@ def test_error_loop_after_cursor_written(daemon_not_paused_instance, crash_locat
assert ticks[0].status != TickStatus.FAILURE
assert ticks[0].timestamp == test_time.timestamp()
assert ticks[0].tick_data.end_timestamp == test_time.timestamp()
assert ticks[0].tick_data.auto_materialize_evaluation_id == 2 # finishes
assert ticks[0].tick_data.auto_materialize_evaluation_id == 5 # finishes


spawn_ctx = multiprocessing.get_context("spawn")
Expand Down

0 comments on commit 4804b8b

Please sign in to comment.