diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index 1301252c21d9c..aa3dd69fc9721 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -357,13 +357,13 @@ def compute_latest_time_window_subset( else: check.failed(f"Unsupported partitions_def: {partitions_def}") - def compute_subset_with_status( + async def compute_subset_with_status( self, key: AssetCheckKey, status: Optional["AssetCheckExecutionResolvedStatus"] ): """Returns the subset of an asset check that matches a given status.""" from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecord - latest_record = AssetCheckExecutionRecord.blocking_get(self, key) + latest_record = await AssetCheckExecutionRecord.gen(self, key) resolved_status = ( latest_record.resolve_status(self) if latest_record and latest_record.targets_latest_materialization(self) @@ -374,28 +374,32 @@ def compute_subset_with_status( else: return self.get_empty_subset(key=key) - def _compute_run_in_progress_check_subset( + async def _compute_run_in_progress_check_subset( self, key: AssetCheckKey ) -> EntitySubset[AssetCheckKey]: from dagster._core.storage.asset_check_execution_record import ( AssetCheckExecutionResolvedStatus, ) - return self.compute_subset_with_status(key, AssetCheckExecutionResolvedStatus.IN_PROGRESS) + return await self.compute_subset_with_status( + key, AssetCheckExecutionResolvedStatus.IN_PROGRESS + ) - def _compute_execution_failed_check_subset( + async def _compute_execution_failed_check_subset( self, key: AssetCheckKey ) -> EntitySubset[AssetCheckKey]: from dagster._core.storage.asset_check_execution_record import ( AssetCheckExecutionResolvedStatus, ) - return self.compute_subset_with_status( + return await self.compute_subset_with_status( key, AssetCheckExecutionResolvedStatus.EXECUTION_FAILED ) - def _compute_missing_check_subset(self, key: AssetCheckKey) -> EntitySubset[AssetCheckKey]: - return self.compute_subset_with_status(key, None) + async def _compute_missing_check_subset( + self, key: AssetCheckKey + ) -> EntitySubset[AssetCheckKey]: + return await self.compute_subset_with_status(key, None) async def _compute_run_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]: from dagster._core.storage.partition_status_cache import AssetStatusCacheValue @@ -411,7 +415,9 @@ async def _compute_run_in_progress_asset_subset(self, key: AssetKey) -> EntitySu value = self._queryer.get_in_progress_asset_subset(asset_key=key).value return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) - def _compute_backfill_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]: + async def _compute_backfill_in_progress_asset_subset( + self, key: AssetKey + ) -> EntitySubset[AssetKey]: value = ( self._queryer.get_active_backfill_in_progress_asset_graph_subset() .get_asset_subset(asset_key=key, asset_graph=self.asset_graph) @@ -473,19 +479,22 @@ async def _compute_missing_asset_subset( ) @cached_method - def compute_run_in_progress_subset(self, *, key: EntityKey) -> EntitySubset: - return _dispatch( + async def compute_run_in_progress_subset(self, *, key: EntityKey) -> EntitySubset: + return await _dispatch( key=key, check_method=self._compute_run_in_progress_check_subset, asset_method=self._compute_run_in_progress_asset_subset, ) @cached_method - def compute_backfill_in_progress_subset(self, *, key: EntityKey) -> EntitySubset: - return _dispatch( + async def compute_backfill_in_progress_subset(self, *, key: EntityKey) -> EntitySubset: + async def get_empty_subset(key: EntityKey) -> EntitySubset: + return self.get_empty_subset(key=key) + + return await _dispatch( key=key, # asset checks cannot currently be backfilled - check_method=lambda k: self.get_empty_subset(key=k), + check_method=get_empty_subset, asset_method=self._compute_backfill_in_progress_asset_subset, ) @@ -509,7 +518,7 @@ async def compute_missing_subset( ), ) - def _compute_updated_since_cursor_subset( + async def _compute_updated_since_cursor_subset( self, key: AssetKey, cursor: Optional[int] ) -> EntitySubset[AssetKey]: value = self._queryer.get_asset_subset_updated_after_cursor( @@ -517,14 +526,14 @@ def _compute_updated_since_cursor_subset( ).value return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) - def _compute_updated_since_time_subset( + async def _compute_updated_since_time_subset( self, key: AssetCheckKey, time: datetime ) -> EntitySubset[AssetCheckKey]: from dagster._core.events import DagsterEventType from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecord # intentionally left unimplemented for AssetKey, as this is a less performant query - record = AssetCheckExecutionRecord.blocking_get(self, key) + record = await AssetCheckExecutionRecord.gen(self, key) if ( record is None or record.event is None @@ -536,10 +545,10 @@ def _compute_updated_since_time_subset( return self.get_full_subset(key=key) @cached_method - def compute_updated_since_temporal_context_subset( + async def compute_updated_since_temporal_context_subset( self, *, key: EntityKey, temporal_context: TemporalContext ) -> EntitySubset: - return _dispatch( + return await _dispatch( key=key, check_method=functools.partial( self._compute_updated_since_time_subset, time=temporal_context.effective_dt @@ -627,11 +636,11 @@ def _build_multi_partition_subset( async def _dispatch( *, key: EntityKey, - check_method: Callable[[AssetCheckKey], O_Dispatch], + check_method: Callable[[AssetCheckKey], Awaitable[O_Dispatch]], asset_method: Callable[[AssetKey], Awaitable[O_Dispatch]], ) -> O_Dispatch: """Applies a method for either a check or an asset.""" if isinstance(key, AssetCheckKey): - return check_method(key) + return await check_method(key) else: return await asset_method(key) diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py index 27cec07120c6e..0861c27fc37b9 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py @@ -73,8 +73,8 @@ class RunInProgressAutomationCondition(SubsetAutomationCondition): def name(self) -> str: return "execution_in_progress" - def compute_subset(self, context: AutomationContext) -> EntitySubset: - return context.asset_graph_view.compute_run_in_progress_subset(key=context.key) + async def compute_subset(self, context: AutomationContext) -> EntitySubset: + return await context.asset_graph_view.compute_run_in_progress_subset(key=context.key) @whitelist_for_serdes @@ -84,8 +84,8 @@ class BackfillInProgressAutomationCondition(SubsetAutomationCondition): def name(self) -> str: return "backfill_in_progress" - def compute_subset(self, context: AutomationContext) -> EntitySubset: - return context.asset_graph_view.compute_backfill_in_progress_subset(key=context.key) + async def compute_subset(self, context: AutomationContext) -> EntitySubset: + return await context.asset_graph_view.compute_backfill_in_progress_subset(key=context.key) @whitelist_for_serdes(storage_name="FailedAutomationCondition") @@ -147,11 +147,11 @@ class NewlyUpdatedCondition(SubsetAutomationCondition): def name(self) -> str: return "newly_updated" - def compute_subset(self, context: AutomationContext) -> EntitySubset: + async def compute_subset(self, context: AutomationContext) -> EntitySubset: # if it's the first time evaluating, just return the empty subset if context.previous_temporal_context is None: return context.get_empty_subset() - return context.asset_graph_view.compute_updated_since_temporal_context_subset( + return await context.asset_graph_view.compute_updated_since_temporal_context_subset( key=context.key, temporal_context=context.previous_temporal_context ) @@ -240,7 +240,7 @@ class CheckResultCondition(SubsetAutomationCondition[AssetCheckKey]): def name(self) -> str: return "check_passed" if self.passed else "check_failed" - def compute_subset( + async def compute_subset( self, context: AutomationContext[AssetCheckKey] ) -> EntitySubset[AssetCheckKey]: from dagster._core.storage.asset_check_execution_record import ( @@ -252,6 +252,6 @@ def compute_subset( if self.passed else AssetCheckExecutionResolvedStatus.FAILED ) - return context.asset_graph_view.compute_subset_with_status( + return await context.asset_graph_view.compute_subset_with_status( key=context.key, status=target_status )