diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index a832b9202546e..1301252c21d9c 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -3,6 +3,7 @@ from typing import ( TYPE_CHECKING, AbstractSet, + Awaitable, Callable, Dict, Literal, @@ -396,18 +397,18 @@ def _compute_execution_failed_check_subset( def _compute_missing_check_subset(self, key: AssetCheckKey) -> EntitySubset[AssetCheckKey]: return self.compute_subset_with_status(key, None) - def _compute_run_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]: + async def _compute_run_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]: from dagster._core.storage.partition_status_cache import AssetStatusCacheValue partitions_def = self._get_partitions_def(key) if partitions_def: - cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def)) - if cache_value is None: - value = partitions_def.empty_subset() - else: - value = cache_value.deserialize_in_progress_partition_subsets(partitions_def) - else: - value = self._queryer.get_in_progress_asset_subset(asset_key=key).value + cache_value = await AssetStatusCacheValue.gen(self, (key, partitions_def)) + return ( + cache_value.get_in_progress_subset(self, key, partitions_def) + if cache_value + else self.get_empty_subset(key=key) + ) + value = self._queryer.get_in_progress_asset_subset(asset_key=key).value return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) def _compute_backfill_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]: @@ -418,21 +419,21 @@ def _compute_backfill_in_progress_asset_subset(self, key: AssetKey) -> EntitySub ) return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) - def _compute_execution_failed_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]: + async def _compute_execution_failed_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]: from dagster._core.storage.partition_status_cache import AssetStatusCacheValue partitions_def = self._get_partitions_def(key) if partitions_def: - cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def)) - if cache_value is None: - value = partitions_def.empty_subset() - else: - value = cache_value.deserialize_failed_partition_subsets(partitions_def) - else: - value = self._queryer.get_failed_asset_subset(asset_key=key).value + cache_value = await AssetStatusCacheValue.gen(self, (key, partitions_def)) + return ( + cache_value.get_failed_subset(self, key, partitions_def) + if cache_value + else self.get_empty_subset(key=key) + ) + value = self._queryer.get_failed_asset_subset(asset_key=key).value return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) - def _compute_missing_asset_subset( + async def _compute_missing_asset_subset( self, key: AssetKey, from_subset: EntitySubset ) -> EntitySubset[AssetKey]: """Returns a subset which is the subset of the input subset that has never been materialized @@ -448,7 +449,7 @@ def _compute_missing_asset_subset( # cheap call which takes advantage of the partition status cache partitions_def = self._get_partitions_def(key) if partitions_def: - cache_value = AssetStatusCacheValue.blocking_get(self, (key, partitions_def)) + cache_value = await AssetStatusCacheValue.gen(self, (key, partitions_def)) return ( cache_value.get_materialized_subset(self, key, partitions_def) if cache_value @@ -489,16 +490,18 @@ def compute_backfill_in_progress_subset(self, *, key: EntityKey) -> EntitySubset ) @cached_method - def compute_execution_failed_subset(self, *, key: EntityKey) -> EntitySubset: - return _dispatch( + async def compute_execution_failed_subset(self, *, key: EntityKey) -> EntitySubset: + return await _dispatch( key=key, check_method=self._compute_execution_failed_check_subset, asset_method=self._compute_execution_failed_asset_subset, ) @cached_method - def compute_missing_subset(self, *, key: EntityKey, from_subset: EntitySubset) -> EntitySubset: - return _dispatch( + async def compute_missing_subset( + self, *, key: EntityKey, from_subset: EntitySubset + ) -> EntitySubset: + return await _dispatch( key=key, check_method=self._compute_missing_check_subset, asset_method=functools.partial( @@ -621,14 +624,14 @@ def _build_multi_partition_subset( O_Dispatch = TypeVar("O_Dispatch") -def _dispatch( +async def _dispatch( *, key: EntityKey, check_method: Callable[[AssetCheckKey], O_Dispatch], - asset_method: Callable[[AssetKey], O_Dispatch], + asset_method: Callable[[AssetKey], Awaitable[O_Dispatch]], ) -> O_Dispatch: """Applies a method for either a check or an asset.""" if isinstance(key, AssetCheckKey): return check_method(key) else: - return asset_method(key) + return await asset_method(key) diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_context.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_context.py index 8033c9f50ca92..5feb99137240f 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_context.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_context.py @@ -10,6 +10,7 @@ from dagster._core.definitions.asset_key import AssetCheckKey, AssetKey, EntityKey, T_EntityKey from dagster._core.definitions.declarative_automation.automation_condition import ( AutomationCondition, + AutomationResult, ) from dagster._core.definitions.declarative_automation.legacy.legacy_context import ( LegacyRuleEvaluationContext, diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py index 3f5c3157936e1..27cec07120c6e 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py @@ -60,8 +60,8 @@ class MissingAutomationCondition(SubsetAutomationCondition): def name(self) -> str: return "missing" - def compute_subset(self, context: AutomationContext) -> EntitySubset: - return context.asset_graph_view.compute_missing_subset( + async def compute_subset(self, context: AutomationContext) -> EntitySubset: + return await context.asset_graph_view.compute_missing_subset( key=context.key, from_subset=context.candidate_subset ) @@ -95,8 +95,8 @@ class ExecutionFailedAutomationCondition(SubsetAutomationCondition): def name(self) -> str: return "execution_failed" - def compute_subset(self, context: AutomationContext) -> EntitySubset: - return context.asset_graph_view.compute_execution_failed_subset(key=context.key) + async def compute_subset(self, context: AutomationContext) -> EntitySubset: + return await context.asset_graph_view.compute_execution_failed_subset(key=context.key) @whitelist_for_serdes diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/subset_automation_condition.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/subset_automation_condition.py index a94d4138d3893..a66ed1281ea20 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/subset_automation_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/subset_automation_condition.py @@ -1,3 +1,4 @@ +import inspect from abc import abstractmethod from dagster._core.asset_graph_view.entity_subset import EntitySubset @@ -23,10 +24,14 @@ def compute_subset( self, context: AutomationContext[T_EntityKey] ) -> EntitySubset[T_EntityKey]: ... - def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[T_EntityKey]: + async def evaluate( + self, context: AutomationContext[T_EntityKey] + ) -> AutomationResult[T_EntityKey]: # don't compute anything if there are no candidates if context.candidate_subset.is_empty: true_subset = context.get_empty_subset() + elif inspect.iscoroutinefunction(self.compute_subset): + true_subset = await self.compute_subset(context) else: true_subset = self.compute_subset(context)