Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug when switching from non-user-code to user-code #25381

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 25 additions & 27 deletions python_modules/dagster/dagster/_daemon/asset_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,16 +493,11 @@ def _run_iteration_impl(
if not eligible_sensors_and_repos:
return

all_auto_materialize_states = {
all_sensor_states = {
sensor_state.selector_id: sensor_state
for sensor_state in instance.all_instigator_state(
instigator_type=InstigatorType.SENSOR
)
if (
sensor_state.sensor_instigator_data
and sensor_state.sensor_instigator_data.sensor_type
and sensor_state.sensor_instigator_data.sensor_type.is_handled_by_asset_daemon
)
}

if not self._checked_migrations:
Expand All @@ -517,13 +512,11 @@ def _run_iteration_impl(
self._logger.info(
"Translating legacy cursor into a new cursor for each new automation policy sensor"
)
all_auto_materialize_states = (
self._create_initial_sensor_cursors_from_raw_cursor(
instance,
eligible_sensors_and_repos,
all_auto_materialize_states,
pre_sensor_cursor,
)
all_sensor_states = self._create_initial_sensor_cursors_from_raw_cursor(
instance,
eligible_sensors_and_repos,
all_sensor_states,
pre_sensor_cursor,
)

set_has_migrated_to_sensors(instance)
Expand All @@ -533,8 +526,8 @@ def _run_iteration_impl(
self._logger.info(
"Renaming any states corresponding to the legacy default name"
)
all_auto_materialize_states = self._copy_default_auto_materialize_sensor_states(
instance, all_auto_materialize_states
all_sensor_states = self._copy_default_auto_materialize_sensor_states(
instance, all_sensor_states
)
set_has_migrated_sensor_names(instance)

Expand All @@ -543,7 +536,7 @@ def _run_iteration_impl(
for sensor, repo in eligible_sensors_and_repos:
selector_id = sensor.selector_id
if sensor.get_current_instigator_state(
all_auto_materialize_states.get(selector_id)
all_sensor_states.get(selector_id)
).is_running:
sensors_and_repos.append((sensor, repo))

Expand All @@ -554,12 +547,12 @@ def _run_iteration_impl(
None,
) # Represents that there's a single set of ticks with no underlying sensor
)
all_auto_materialize_states = {}
all_sensor_states = {}

for sensor, repo in sensors_and_repos:
if sensor:
selector_id = sensor.selector.get_id()
auto_materialize_state = all_auto_materialize_states.get(selector_id)
auto_materialize_state = all_sensor_states.get(selector_id)
else:
selector_id = None
auto_materialize_state = None
Expand Down Expand Up @@ -617,7 +610,7 @@ def _create_initial_sensor_cursors_from_raw_cursor(
self,
instance: DagsterInstance,
sensors_and_repos: Sequence[Tuple[RemoteSensor, RemoteRepository]],
all_auto_materialize_states: Mapping[str, InstigatorState],
all_sensor_states: Mapping[str, InstigatorState],
pre_sensor_cursor: AssetDaemonCursor,
) -> Mapping[str, InstigatorState]:
start_status = (
Expand Down Expand Up @@ -645,7 +638,7 @@ def _create_initial_sensor_cursors_from_raw_cursor(
),
)

if all_auto_materialize_states.get(sensor.selector_id):
if all_sensor_states.get(sensor.selector_id):
instance.update_instigator_state(new_auto_materialize_state)
else:
instance.add_instigator_state(new_auto_materialize_state)
Expand All @@ -657,16 +650,21 @@ def _create_initial_sensor_cursors_from_raw_cursor(
def _copy_default_auto_materialize_sensor_states(
self,
instance: DagsterInstance,
all_auto_materialize_states: Mapping[str, InstigatorState],
all_sensor_states: Mapping[str, InstigatorState],
) -> Mapping[str, InstigatorState]:
"""Searches for sensors named `default_auto_materialize_sensor` and copies their state
to a sensor in the same repository named `default_automation_condition_sensor`.
"""
result = dict(all_auto_materialize_states)

for instigator_state in all_auto_materialize_states.values():
# only migrate instigators with the name "default_auto_materialize_sensor"
if instigator_state.origin.instigator_name != "default_auto_materialize_sensor":
result = dict(all_sensor_states)

for instigator_state in all_sensor_states.values():
# only migrate instigators with the name "default_auto_materialize_sensor" and are
# handled by the asset daemon
if instigator_state.origin.instigator_name != "default_auto_materialize_sensor" and (
instigator_state.sensor_instigator_data
and instigator_state.sensor_instigator_data.sensor_type
and instigator_state.sensor_instigator_data.sensor_type.is_handled_by_asset_daemon
):
continue
new_sensor_origin = instigator_state.origin._replace(
instigator_name="default_automation_condition_sensor"
Expand All @@ -679,7 +677,7 @@ def _copy_default_auto_materialize_sensor_states(
)
new_sensor_selector_id = new_sensor_origin.get_selector().get_id()
result[new_sensor_selector_id] = new_auto_materialize_state
if all_auto_materialize_states.get(new_sensor_selector_id):
if all_sensor_states.get(new_sensor_selector_id):
instance.update_instigator_state(new_auto_materialize_state)
else:
instance.add_instigator_state(new_auto_materialize_state)
Expand Down
5 changes: 0 additions & 5 deletions python_modules/dagster/dagster/_daemon/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,11 +371,6 @@ def execute_sensor_iteration(
all_sensor_states = {
sensor_state.selector_id: sensor_state
for sensor_state in instance.all_instigator_state(instigator_type=InstigatorType.SENSOR)
if not ( # filter out sensors state handled by asset daemon
sensor_state.sensor_instigator_data
and sensor_state.sensor_instigator_data.sensor_type
and sensor_state.sensor_instigator_data.sensor_type.is_handled_by_asset_daemon
)
}

tick_retention_settings = instance.get_tick_retention_settings(InstigatorType.SENSOR)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import dagster as dg


@dg.asset
def root() -> None: ...


@dg.asset(deps=[root], automation_condition=dg.AutomationCondition.eager())
def downstream() -> None: ...


defs = dg.Definitions(assets=[root, downstream])
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import dagster as dg


def get_defs() -> dg.Definitions:
from .simple_defs import defs as simple_defs # noqa

return dg.Definitions(
assets=simple_defs.assets,
sensors=[
dg.AutomationConditionSensorDefinition(
name="the_sensor", asset_selection="*", user_code=False
)
],
)


defs = get_defs()
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import dagster as dg


def get_defs() -> dg.Definitions:
from .simple_defs import defs as simple_defs # noqa

return dg.Definitions(
assets=simple_defs.assets,
sensors=[
dg.AutomationConditionSensorDefinition(
name="the_sensor", asset_selection="*", user_code=True
)
],
)


defs = get_defs()
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import sys
from contextlib import contextmanager
from typing import AbstractSet, Mapping, Sequence, cast
from typing import AbstractSet, Mapping, Optional, Sequence, cast

import dagster._check as check
import pytest
Expand Down Expand Up @@ -37,7 +37,9 @@
from dagster._time import get_current_datetime


def get_code_location_origin(filename: str) -> InProcessCodeLocationOrigin:
def get_code_location_origin(
filename: str, location_name: Optional[str] = None
) -> InProcessCodeLocationOrigin:
return InProcessCodeLocationOrigin(
loadable_target_origin=LoadableTargetOrigin(
executable_path=sys.executable,
Expand All @@ -46,7 +48,7 @@ def get_code_location_origin(filename: str) -> InProcessCodeLocationOrigin:
),
working_directory=os.getcwd(),
),
location_name=filename,
location_name=location_name or filename,
)


Expand Down Expand Up @@ -521,3 +523,54 @@ def test_backfill_with_runs_and_checks() -> None:
assert len(backfills) == 0
runs = _get_runs_for_latest_ticks(context)
assert len(runs) == 0


def test_toggle_user_code() -> None:
with instance_for_test(
overrides={
"run_launcher": {
"module": "dagster._core.launcher.sync_in_memory_run_launcher",
"class": "SyncInMemoryRunLauncher",
},
}
) as instance, get_threadpool_executor() as executor:
user_code_target = InProcessTestWorkspaceLoadTarget(
[get_code_location_origin("simple_user_code", location_name="simple")]
)
non_user_code_target = InProcessTestWorkspaceLoadTarget(
[get_code_location_origin("simple_non_user_code", location_name="simple")]
)

# start off with non user code target, just do the setup once
with create_test_daemon_workspace_context(
workspace_load_target=non_user_code_target, instance=instance
) as context:
_setup_instance(context)

time = get_current_datetime()
# toggle back and forth between target types
for target in [non_user_code_target, user_code_target, non_user_code_target]:
with create_test_daemon_workspace_context(
workspace_load_target=target, instance=instance
) as context:
time += datetime.timedelta(seconds=35)
with freeze_time(time):
# first tick, nothing happened
_execute_ticks(context, executor)
runs = _get_runs_for_latest_ticks(context)
assert len(runs) == 0

time += datetime.timedelta(seconds=35)
with freeze_time(time):
# second tick, root gets updated
instance.report_runless_asset_event(AssetMaterialization("root"))
_execute_ticks(context, executor)
runs = _get_runs_for_latest_ticks(context)
assert runs[0].asset_selection == {AssetKey("downstream")}

time += datetime.timedelta(seconds=35)
with freeze_time(time):
# third tick, don't kick off again
_execute_ticks(context, executor)
runs = _get_runs_for_latest_ticks(context)
assert len(runs) == 0