diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/__snapshots__/test_sensors.ambr b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/__snapshots__/test_sensors.ambr index afe042838469a..f5abcbd32b860 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/__snapshots__/test_sensors.ambr +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/__snapshots__/test_sensors.ambr @@ -98,6 +98,26 @@ }), ]), }), + dict({ + 'description': None, + 'minIntervalSeconds': 30, + 'name': 'default_automation_condition_sensor', + 'sensorState': dict({ + 'runs': list([ + ]), + 'runsCount': 0, + 'status': 'STOPPED', + 'ticks': list([ + ]), + }), + 'targets': list([ + dict({ + 'mode': 'default', + 'pipelineName': '__ASSET_JOB', + 'solidSelection': None, + }), + ]), + }), dict({ 'description': None, 'minIntervalSeconds': 30, diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py index 5d42a80d7b3a4..9444d9871b2f3 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py @@ -101,7 +101,12 @@ from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionsDefinition from dagster._core.definitions.partition import PartitionedConfig from dagster._core.definitions.reconstruct import ReconstructableRepository -from dagster._core.definitions.sensor_definition import RunRequest, SensorDefinition, SkipReason +from dagster._core.definitions.sensor_definition import ( + RunRequest, + SensorDefinition, + SensorType, + SkipReason, +) from dagster._core.definitions.unresolved_asset_job_definition import UnresolvedAssetJobDefinition from dagster._core.errors import DagsterInvalidDefinitionError from dagster._core.log_manager import coerce_valid_log_level @@ -2166,6 +2171,12 @@ def define_asset_checks(): def _targets_asset_job(instigator: Union[ScheduleDefinition, SensorDefinition]) -> bool: + if isinstance(instigator, SensorDefinition) and instigator.sensor_type in ( + # these contain asset selections, which are invalid with the dict repo + SensorType.AUTOMATION, + SensorType.AUTO_MATERIALIZE, + ): + return True try: return instigator.job_name in asset_job_names or instigator.has_anonymous_job except DagsterInvalidDefinitionError: # thrown when `job_name` is invalid diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_sensors.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_sensors.py index a29a4528a31ec..672ef09d1b63d 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_sensors.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_sensors.py @@ -733,7 +733,10 @@ def test_get_sensors(self, graphql_context: WorkspaceRequestContext, snapshot): assert result.data assert result.data["sensorsOrError"] - assert result.data["sensorsOrError"]["__typename"] == "Sensors" + print("-" * 200) + print(result.data) + print("-" * 200) + assert result.data["sensorsOrError"]["__typename"] == "Sensors", result.data results = result.data["sensorsOrError"]["results"] # Snapshot is different for test_dict_repo because it does not contain any asset jobs, diff --git a/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py b/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py index 376d09c970ec8..dc071e9daf5f9 100644 --- a/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py @@ -17,8 +17,9 @@ from dagster._core.definitions.utils import check_valid_name from dagster._utils.tags import normalize_tags -EMIT_BACKFILLS_METADATA_KEY = "dagster/emit_backfills" MAX_ENTITIES = 500 +EMIT_BACKFILLS_METADATA_KEY = "dagster/emit_backfills" +DEFAULT_AUTOMATION_CONDITION_SENSOR_NAME = "default_automation_condition_sensor" def _evaluate(sensor_def: "AutomationConditionSensorDefinition", context: SensorEvaluationContext): @@ -49,6 +50,11 @@ def _evaluate(sensor_def: "AutomationConditionSensorDefinition", context: Sensor default_condition=sensor_def.default_condition, logger=context.log, ) + check.invariant( + context.instance.auto_materialize_use_sensors, + "Cannot evaluate an AutomationConditionSensorDefinition if the instance setting " + " `auto_materialize: use_sensors` is set to False. Update your configuration to prevent this error.", + ) check.invariant( evaluation_context.total_keys <= MAX_ENTITIES, f'AutomationConditionSensorDefintion "{sensor_def.name}" targets {evaluation_context.total_keys} ' diff --git a/python_modules/dagster/dagster/_core/definitions/definitions_class.py b/python_modules/dagster/dagster/_core/definitions/definitions_class.py index 535844f07c666..a889119fc71a5 100644 --- a/python_modules/dagster/dagster/_core/definitions/definitions_class.py +++ b/python_modules/dagster/dagster/_core/definitions/definitions_class.py @@ -43,7 +43,10 @@ from dagster._core.definitions.schedule_definition import ScheduleDefinition from dagster._core.definitions.sensor_definition import SensorDefinition from dagster._core.definitions.unresolved_asset_job_definition import UnresolvedAssetJobDefinition -from dagster._core.definitions.utils import dedupe_object_refs +from dagster._core.definitions.utils import ( + add_default_automation_condition_sensor, + dedupe_object_refs, +) from dagster._core.errors import DagsterInvariantViolationError from dagster._core.execution.build_resources import wrap_resources_for_execution from dagster._core.execution.with_resources import with_resources @@ -268,10 +271,17 @@ def _create_repository_using_definitions_args( # First, dedupe all definition types. sensors = dedupe_object_refs(sensors) jobs = dedupe_object_refs(jobs) - assets = dedupe_object_refs(assets) + assets = _canonicalize_specs_to_assets_defs(dedupe_object_refs(assets)) schedules = dedupe_object_refs(schedules) asset_checks = dedupe_object_refs(asset_checks) + # add in a default automation condition sensor definition if required + sensors = add_default_automation_condition_sensor( + sensors, + [asset for asset in assets if not isinstance(asset, CacheableAssetsDefinition)], + asset_checks or [], + ) + executor_def = ( executor if isinstance(executor, ExecutorDefinition) or executor is None @@ -296,7 +306,7 @@ def _create_repository_using_definitions_args( ) def created_repo(): return [ - *with_resources(_canonicalize_specs_to_assets_defs(assets or []), resource_defs), + *with_resources(assets, resource_defs), *with_resources(asset_checks or [], resource_defs), *(schedules_with_resources), *(sensors_with_resources), diff --git a/python_modules/dagster/dagster/_core/definitions/utils.py b/python_modules/dagster/dagster/_core/definitions/utils.py index 7a177be59b511..389302e27f310 100644 --- a/python_modules/dagster/dagster/_core/definitions/utils.py +++ b/python_modules/dagster/dagster/_core/definitions/utils.py @@ -11,17 +11,20 @@ Mapping, Optional, Sequence, + Set, Tuple, TypeVar, + Union, cast, ) import yaml import dagster._check as check +from dagster._core.definitions.asset_key import AssetCheckKey, EntityKey from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError from dagster._core.utils import is_valid_email -from dagster._utils.warnings import deprecation_warning +from dagster._utils.warnings import deprecation_warning, disable_dagster_warnings from dagster._utils.yaml_utils import merge_yaml_strings, merge_yamls DEFAULT_OUTPUT = "result" @@ -59,10 +62,16 @@ if TYPE_CHECKING: from dagster._core.definitions.asset_key import AssetKey + from dagster._core.definitions.asset_selection import AssetSelection + from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy + from dagster._core.definitions.base_asset_graph import BaseAssetGraph from dagster._core.definitions.declarative_automation.automation_condition import ( AutomationCondition, ) + from dagster._core.definitions.sensor_definition import SensorDefinition + from dagster._core.definitions.source_asset import SourceAsset + from dagster._core.remote_representation.external import RemoteSensor class NoValueSentinel: @@ -311,3 +320,89 @@ def resolve_automation_condition( def dedupe_object_refs(objects: Optional[Iterable[T]]) -> Sequence[T]: """Dedupe definitions by reference equality.""" return list({id(obj): obj for obj in objects}.values()) if objects is not None else [] + + +def add_default_automation_condition_sensor( + sensors: Sequence["SensorDefinition"], + assets: Iterable[Union["AssetsDefinition", "SourceAsset"]], + asset_checks: Iterable["AssetsDefinition"], +) -> Sequence["SensorDefinition"]: + """Adds a default automation condition sensor if the provided sensors do not already handle all + provided assets. + """ + from dagster._core.definitions.asset_graph import AssetGraph + from dagster._core.definitions.automation_condition_sensor_definition import ( + DEFAULT_AUTOMATION_CONDITION_SENSOR_NAME, + AutomationConditionSensorDefinition, + ) + + with disable_dagster_warnings(): + asset_graph = AssetGraph.from_assets([*assets, *asset_checks]) + sensor_selection = get_default_automation_condition_sensor_selection(sensors, asset_graph) + if sensor_selection: + default_sensor = AutomationConditionSensorDefinition( + DEFAULT_AUTOMATION_CONDITION_SENSOR_NAME, asset_selection=sensor_selection + ) + sensors = [*sensors, default_sensor] + + return sensors + + +def get_default_automation_condition_sensor_selection( + sensors: Sequence[Union["SensorDefinition", "RemoteSensor"]], asset_graph: "BaseAssetGraph" +) -> Optional["AssetSelection"]: + from dagster._core.definitions.asset_selection import AssetSelection + from dagster._core.definitions.sensor_definition import SensorType + + automation_condition_sensors = sorted( + ( + s + for s in sensors + if s.sensor_type in (SensorType.AUTO_MATERIALIZE, SensorType.AUTOMATION) + ), + key=lambda s: s.name, + ) + + automation_condition_keys = set() + for k in asset_graph.materializable_asset_keys | asset_graph.asset_check_keys: + if asset_graph.get(k).automation_condition is not None: + automation_condition_keys.add(k) + + has_auto_observe_keys = False + for k in asset_graph.observable_asset_keys: + if ( + # for backcompat, treat auto-observe assets as if they have a condition + asset_graph.get(k).automation_condition is not None + or asset_graph.get(k).auto_observe_interval_minutes is not None + ): + has_auto_observe_keys = True + automation_condition_keys.add(k) + + # get the set of keys that are handled by an existing sensor + covered_keys: Set[EntityKey] = set() + for sensor in automation_condition_sensors: + selection = check.not_none(sensor.asset_selection) + covered_keys = covered_keys.union( + selection.resolve(asset_graph) | selection.resolve_checks(asset_graph) + ) + + default_sensor_keys = automation_condition_keys - covered_keys + if len(default_sensor_keys) > 0: + # Use AssetSelection.all if the default sensor is the only sensor - otherwise + # enumerate the assets that are not already included in some other + # non-default sensor + default_sensor_asset_selection = AssetSelection.all(include_sources=has_auto_observe_keys) + + # if there are any asset checks, include checks in the selection + if any(isinstance(k, AssetCheckKey) for k in default_sensor_keys): + default_sensor_asset_selection |= AssetSelection.all_asset_checks() + + # remove any selections that are already covered + for sensor in automation_condition_sensors: + default_sensor_asset_selection = default_sensor_asset_selection - check.not_none( + sensor.asset_selection + ) + return default_sensor_asset_selection + # no additional sensor required + else: + return None diff --git a/python_modules/dagster/dagster/_core/remote_representation/external.py b/python_modules/dagster/dagster/_core/remote_representation/external.py index e1b271153c1ed..2b4779d8fc96e 100644 --- a/python_modules/dagster/dagster/_core/remote_representation/external.py +++ b/python_modules/dagster/dagster/_core/remote_representation/external.py @@ -23,6 +23,9 @@ from dagster._config.snap import ConfigFieldSnap, ConfigSchemaSnapshot from dagster._core.definitions.asset_check_spec import AssetCheckKey from dagster._core.definitions.asset_job import IMPLICIT_ASSET_JOB_NAME +from dagster._core.definitions.automation_condition_sensor_definition import ( + DEFAULT_AUTOMATION_CONDITION_SENSOR_NAME, +) from dagster._core.definitions.backfill_policy import BackfillPolicy from dagster._core.definitions.events import AssetKey from dagster._core.definitions.metadata import MetadataValue @@ -40,6 +43,7 @@ DefaultSensorStatus, SensorType, ) +from dagster._core.definitions.utils import get_default_automation_condition_sensor_selection from dagster._core.errors import DagsterError from dagster._core.execution.plan.handle import ResolvedFromDynamicStepHandle, StepHandle from dagster._core.instance import DagsterInstance @@ -86,7 +90,6 @@ from dagster._utils.schedules import schedule_execution_time_iterator if TYPE_CHECKING: - from dagster._core.definitions.asset_key import EntityKey from dagster._core.definitions.remote_asset_graph import RemoteRepositoryAssetGraph from dagster._core.scheduler.instigation import InstigatorState from dagster._core.snap.execution_plan_snapshot import ExecutionStepSnap @@ -191,9 +194,6 @@ def _utilized_env_vars(self) -> Mapping[str, Sequence[EnvVarConsumer]]: def get_utilized_env_vars(self) -> Mapping[str, Sequence[EnvVarConsumer]]: return self._utilized_env_vars - def get_default_auto_materialize_sensor_name(self) -> str: - return "default_automation_condition_sensor" - @property @cached_method def _sensors(self) -> Dict[str, "RemoteSensor"]: @@ -202,81 +202,28 @@ def _sensors(self) -> Dict[str, "RemoteSensor"]: for sensor_snap in self.repository_snap.sensors } - if self._instance.auto_materialize_use_sensors: - asset_graph = self.asset_graph - - has_any_auto_observe_source_assets = False - - existing_automation_condition_sensors = { - sensor_name: sensor - for sensor_name, sensor in sensor_datas.items() - if sensor.sensor_type in (SensorType.AUTO_MATERIALIZE, SensorType.AUTOMATION) - } - - covered_entity_keys: Set[EntityKey] = set() - for sensor in existing_automation_condition_sensors.values(): - selection = check.not_none(sensor.asset_selection) - covered_entity_keys = covered_entity_keys.union( - # for now, all asset checks are handled by the same asset as their asset - selection.resolve(asset_graph) | selection.resolve_checks(asset_graph) - ) - - default_sensor_entity_keys = set() - for entity_key in asset_graph.materializable_asset_keys | asset_graph.asset_check_keys: - if not asset_graph.get(entity_key).automation_condition: - continue - - if entity_key not in covered_entity_keys: - default_sensor_entity_keys.add(entity_key) - - for asset_key in asset_graph.observable_asset_keys: - if ( - asset_graph.get(asset_key).auto_observe_interval_minutes is None - and asset_graph.get(asset_key).automation_condition is None - ): - continue - - has_any_auto_observe_source_assets = True - - if asset_key not in covered_entity_keys: - default_sensor_entity_keys.add(asset_key) - - if default_sensor_entity_keys: - default_sensor_asset_check_keys = { - key for key in default_sensor_entity_keys if isinstance(key, AssetCheckKey) - } - # Use AssetSelection.all if the default sensor is the only sensor - otherwise - # enumerate the assets that are not already included in some other - # non-default sensor - default_sensor_asset_selection = AssetSelection.all( - include_sources=has_any_auto_observe_source_assets - ) - # if there are any asset checks, include them - if default_sensor_asset_check_keys: - default_sensor_asset_selection |= AssetSelection.all_asset_checks() - - for sensor in existing_automation_condition_sensors.values(): - default_sensor_asset_selection = ( - default_sensor_asset_selection - check.not_none(sensor.asset_selection) - ) - - default_sensor_data = SensorSnap( - name=self.get_default_auto_materialize_sensor_name(), - job_name=None, - op_selection=None, - asset_selection=default_sensor_asset_selection, - mode=None, - min_interval=30, - description=None, - target_dict={}, - metadata=None, - default_status=None, - sensor_type=SensorType.AUTO_MATERIALIZE, - run_tags=None, - ) - sensor_datas[default_sensor_data.name] = RemoteSensor( - default_sensor_data, self._handle - ) + # if necessary, create a default automation condition sensor + # NOTE: if a user's code location is at a version >= 1.9, then this step should + # never be necessary, as this will be added in Definitions construction process + default_sensor_selection = get_default_automation_condition_sensor_selection( + sensors=[data for data in sensor_datas.values()], asset_graph=self.asset_graph + ) + if default_sensor_selection is not None: + default_sensor_data = SensorSnap( + name=DEFAULT_AUTOMATION_CONDITION_SENSOR_NAME, + job_name=None, + op_selection=None, + asset_selection=default_sensor_selection, + mode=None, + min_interval=30, + description=None, + target_dict={}, + metadata=None, + default_status=None, + sensor_type=SensorType.AUTO_MATERIALIZE, + run_tags=None, + ) + sensor_datas[default_sensor_data.name] = RemoteSensor(default_sensor_data, self._handle) return sensor_datas @@ -466,6 +413,18 @@ def _sensor_mappings( try: keys = sensor.asset_selection.resolve(self.asset_graph) for key in keys: + # only count an asset as targeted by an automation condition sensor if it + # has an automation condition + if sensor.sensor_type in ( + SensorType.AUTO_MATERIALIZE, + SensorType.AUTOMATION, + ): + node_snap = self.get_asset_node_snap(key) + if not node_snap or not ( + node_snap.automation_condition + or node_snap.automation_condition_snapshot + ): + continue asset_key_mapping[key].append(sensor) except DagsterError: pass diff --git a/python_modules/dagster/dagster_tests/definitions_tests/asset_policy_sensors_tests/test_default_auto_materialize_sensors.py b/python_modules/dagster/dagster_tests/definitions_tests/asset_policy_sensors_tests/test_default_auto_materialize_sensors.py index 7d285b2bbc518..5a0784bbf92de 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/asset_policy_sensors_tests/test_default_auto_materialize_sensors.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/asset_policy_sensors_tests/test_default_auto_materialize_sensors.py @@ -150,13 +150,13 @@ def test_default_auto_materialize_sensors_without_observable( assert auto_materialize_sensor.asset_selection == AssetSelection.all(include_sources=False) -def test_no_default_auto_materialize_sensors(instance_without_auto_materialize_sensors): +def test_opt_out_default_auto_materialize_sensors(instance_without_auto_materialize_sensors): repo_handle = RepositoryHandle.for_test( location_name="foo_location", repository_name="bar_repo", ) - # If not opted in, no default sensors are created + # If opted out, we still do create default auto materialize sensors remote_repo = RemoteRepository( RepositorySnap.from_def( defs.get_repository_def(), @@ -165,8 +165,9 @@ def test_no_default_auto_materialize_sensors(instance_without_auto_materialize_s instance=instance_without_auto_materialize_sensors, ) sensors = remote_repo.get_sensors() - assert len(sensors) == 1 - assert sensors[0].name == "normal_sensor" + assert len(sensors) == 2 + assert sensors[0].name == "default_automation_condition_sensor" + assert sensors[1].name == "normal_sensor" def test_combine_default_sensors_with_non_default_sensors(instance_with_auto_materialize_sensors): diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/old_code_server_simulation.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/old_code_server_simulation.py new file mode 100644 index 0000000000000..1fcb97e22114e --- /dev/null +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/old_code_server_simulation.py @@ -0,0 +1,34 @@ +import dagster as dg +from dagster._core.definitions.asset_graph import AssetGraph +from dagster._core.definitions.asset_job import IMPLICIT_ASSET_JOB_NAME, build_asset_job +from dagster._core.definitions.repository_definition.repository_data import CachingRepositoryData +from dagster._core.definitions.repository_definition.repository_definition import ( + RepositoryDefinition, +) + + +@dg.asset(automation_condition=dg.AutomationCondition.initial_evaluation()) +def old_asset() -> None: ... + + +# directly construct so we can simulate an older repo without auto-constructed sensors +repo_data = CachingRepositoryData( + jobs={ + # implicit asset job to make this valid repo data + IMPLICIT_ASSET_JOB_NAME: build_asset_job( + IMPLICIT_ASSET_JOB_NAME, + AssetGraph.from_assets([old_asset]), + allow_different_partitions_defs=True, + ) + }, + schedules={}, + sensors={}, + source_assets_by_key={}, + assets_defs_by_key={old_asset.key: old_asset}, + asset_checks_defs_by_key={}, + top_level_resources={}, + utilized_env_vars={}, + unresolved_partitioned_asset_schedules={}, +) +repo = RepositoryDefinition("repo", repository_data=repo_data) +assert repo.schedule_defs == [] diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py index 229b112fa0b7c..716cdcd59b1bf 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py @@ -621,3 +621,15 @@ def test_500_eager_assets_user_code(capsys) -> None: # more specific check for line in capsys.readouterr(): assert "RESOURCE_EXHAUSTED" not in line + + +def test_simple_old_code_server() -> None: + with get_grpc_workspace_request_context( + "old_code_server_simulation" + ) as context, get_threadpool_executor() as executor: + time = datetime.datetime(2024, 8, 16, 1, 35) + with freeze_time(time): + # initial evaluation + _execute_ticks(context, executor) + runs = _get_runs_for_latest_ticks(context) + assert len(runs) == 1