From 8e94ee35e6bfc528ee640bf37026f0c5407a9645 Mon Sep 17 00:00:00 2001 From: Owen Kephart Date: Thu, 10 Oct 2024 19:02:19 -0700 Subject: [PATCH] Add error for user code sensors with too many entities targeted --- .../toys/auto_materializing/large_graph.py | 53 ++++++++++--------- .../automation_condition_sensor_definition.py | 13 ++++- .../automation_tick_evaluation_context.py | 5 ++ .../automation_condition.py | 3 +- ..._automation_condition_sensor_definition.py | 43 ++++++++++++++- 5 files changed, 88 insertions(+), 29 deletions(-) diff --git a/python_modules/dagster-test/dagster_test/toys/auto_materializing/large_graph.py b/python_modules/dagster-test/dagster_test/toys/auto_materializing/large_graph.py index 3a8626841e750..0d0f20cdd3032 100644 --- a/python_modules/dagster-test/dagster_test/toys/auto_materializing/large_graph.py +++ b/python_modules/dagster-test/dagster_test/toys/auto_materializing/large_graph.py @@ -12,6 +12,7 @@ repository, ) from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy +from dagster._utils.warnings import disable_dagster_warnings class AssetLayerConfig(NamedTuple): @@ -23,36 +24,38 @@ class AssetLayerConfig(NamedTuple): def build_assets( id: str, layer_configs: Sequence[AssetLayerConfig], - automation_condition: AutomationCondition, + automation_condition: Optional[AutomationCondition], ) -> List[AssetsDefinition]: layers = [] - for layer_config in layer_configs: - parent_index = 0 - layer = [] - for i in range(layer_config.n_assets): - if layer_config.n_upstreams_per_asset > 0: - # each asset connects to n_upstreams_per_asset assets from the above layer, chosen - # in a round-robin manner - non_argument_deps = { - layers[-1][(parent_index + j) % len(layers[-1])].key - for j in range(layer_config.n_upstreams_per_asset) - } - parent_index += layer_config.n_upstreams_per_asset - else: - non_argument_deps = set() + with disable_dagster_warnings(): + for layer_num, layer_config in enumerate(layer_configs): + parent_index = 0 + layer = [] + for i in range(layer_config.n_assets): + if layer_config.n_upstreams_per_asset > 0: + # each asset connects to n_upstreams_per_asset assets from the above layer, chosen + # in a round-robin manner + non_argument_deps = { + layers[-1][(parent_index + j) % len(layers[-1])].key + for j in range(layer_config.n_upstreams_per_asset) + } + parent_index += layer_config.n_upstreams_per_asset + else: + non_argument_deps = set() - @asset( - partitions_def=layer_config.partitions_def, - name=f"{id}_{len(layers)}_{i}", - automation_condition=automation_condition, - non_argument_deps=non_argument_deps, - ) - def _asset(): - pass + @asset( + partitions_def=layer_config.partitions_def, + name=f"{id}_{len(layers)}_{i}", + automation_condition=automation_condition, + non_argument_deps=non_argument_deps, + group_name=f"g{layer_num}", + ) + def _asset(): + pass - layer.append(_asset) - layers.append(layer) + layer.append(_asset) + layers.append(layer) return list(itertools.chain(*layers)) diff --git a/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py b/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py index 33ce97d8e16c1..419576d7f31b8 100644 --- a/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/automation_condition_sensor_definition.py @@ -18,6 +18,7 @@ from dagster._utils.tags import normalize_tags EMIT_BACKFILLS_METADATA_KEY = "dagster/emit_backfills" +MAX_ENTITIES = 500 def _evaluate(sensor_def: "AutomationConditionSensorDefinition", context: SensorEvaluationContext): @@ -34,7 +35,8 @@ def _evaluate(sensor_def: "AutomationConditionSensorDefinition", context: Sensor context.cursor, asset_graph, ) - run_requests, new_cursor, updated_evaluations = AutomationTickEvaluationContext( + + evaluation_context = AutomationTickEvaluationContext( evaluation_id=cursor.evaluation_id, instance=context.instance, asset_graph=asset_graph, @@ -46,7 +48,14 @@ def _evaluate(sensor_def: "AutomationConditionSensorDefinition", context: Sensor emit_backfills=sensor_def.emit_backfills, default_condition=sensor_def.default_condition, logger=context.log, - ).evaluate() + ) + check.invariant( + evaluation_context.total_keys <= MAX_ENTITIES, + f'AutomationConditionSensorDefintion "{sensor_def.name}" targets {evaluation_context.total_keys} ' + f"assets or checks, which is more than the limit of {MAX_ENTITIES}. Either set `use_user_code_server` to `False`, " + "or split this sensor up into smaller selections.", + ) + run_requests, new_cursor, updated_evaluations = evaluation_context.evaluate() return SensorResult( run_requests=run_requests, diff --git a/python_modules/dagster/dagster/_core/definitions/automation_tick_evaluation_context.py b/python_modules/dagster/dagster/_core/definitions/automation_tick_evaluation_context.py index e682ff29161d8..2dbe9169e5a0e 100644 --- a/python_modules/dagster/dagster/_core/definitions/automation_tick_evaluation_context.py +++ b/python_modules/dagster/dagster/_core/definitions/automation_tick_evaluation_context.py @@ -72,6 +72,7 @@ def __init__( ) if default_condition or asset_graph.get(entity_key).automation_condition is not None } + self._total_keys = len(resolved_entity_keys) self._evaluation_id = evaluation_id self._evaluator = AutomationConditionEvaluator( entity_keys=resolved_entity_keys, @@ -95,6 +96,10 @@ def cursor(self) -> AssetDaemonCursor: def asset_graph(self) -> BaseAssetGraph: return self._evaluator.asset_graph + @property + def total_keys(self) -> int: + return self._total_keys + def _legacy_build_auto_observe_run_requests(self) -> Sequence[RunRequest]: current_timestamp = self._evaluator.evaluation_time.timestamp() assets_to_auto_observe: Set[AssetKey] = set() diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py index a7168b7a0f603..bd2d1172dabae 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py @@ -161,7 +161,8 @@ def as_auto_materialize_policy(self) -> "AutoMaterializePolicy": """Returns an AutoMaterializePolicy which contains this condition.""" from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy - return AutoMaterializePolicy.from_automation_condition(self) + with disable_dagster_warnings(): + return AutoMaterializePolicy.from_automation_condition(self) @abstractmethod def evaluate( diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_automation_condition_sensor_definition.py b/python_modules/dagster/dagster_tests/definitions_tests/test_automation_condition_sensor_definition.py index 713468fd25d15..56dc0319e525a 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_automation_condition_sensor_definition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_automation_condition_sensor_definition.py @@ -1,12 +1,15 @@ import pytest from dagster import AssetSelection, DefaultSensorStatus, build_sensor_context -from dagster._check.functions import ParameterCheckError +from dagster._check.functions import CheckError, ParameterCheckError from dagster._core.definitions.automation_condition_sensor_definition import ( AutomationConditionSensorDefinition, ) from dagster._core.definitions.declarative_automation.automation_condition import ( AutomationCondition, ) +from dagster._core.definitions.definitions_class import Definitions +from dagster._core.instance import DagsterInstance +from dagster_test.toys.auto_materializing.large_graph import AssetLayerConfig, build_assets @pytest.mark.parametrize( @@ -49,3 +52,41 @@ def test_default_condition() -> None: "foo", asset_selection="*", default_condition=AutomationCondition.eager(), user_code=True ) assert sensor.default_condition == AutomationCondition.eager() + + +def test_limits() -> None: + sensor = AutomationConditionSensorDefinition("foo", asset_selection="*", user_code=True) + + defs = Definitions( + assets=build_assets( + "test", + layer_configs=[AssetLayerConfig(1000)], + automation_condition=AutomationCondition.eager(), + ) + ) + with pytest.raises(CheckError, match='"foo" targets 1000 assets or checks'): + sensor( + build_sensor_context( + instance=DagsterInstance.ephemeral(), + repository_def=defs.get_repository_def(), + ), + ) + + # more than 500 total assets, but only 400 with a condition + with_condition = build_assets( + "cond", + layer_configs=[AssetLayerConfig(400)], + automation_condition=AutomationCondition.eager(), + ) + without_condition = build_assets( + "no_cond", + layer_configs=[AssetLayerConfig(400)], + automation_condition=None, + ) + defs = Definitions(assets=[*with_condition, *without_condition]) + sensor( + build_sensor_context( + instance=DagsterInstance.ephemeral(), + repository_def=defs.get_repository_def(), + ), + )