Skip to content

Commit

Permalink
Make compute check functions async
Browse files Browse the repository at this point in the history
  • Loading branch information
briantu committed Oct 15, 2024
1 parent b0136c3 commit 20e09b1
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,13 +357,13 @@ def compute_latest_time_window_subset(
else:
check.failed(f"Unsupported partitions_def: {partitions_def}")

def compute_subset_with_status(
async def compute_subset_with_status(
self, key: AssetCheckKey, status: Optional["AssetCheckExecutionResolvedStatus"]
):
"""Returns the subset of an asset check that matches a given status."""
from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecord

latest_record = AssetCheckExecutionRecord.blocking_get(self, key)
latest_record = await AssetCheckExecutionRecord.gen(self, key)
resolved_status = (
latest_record.resolve_status(self)
if latest_record and latest_record.targets_latest_materialization(self)
Expand All @@ -374,28 +374,32 @@ def compute_subset_with_status(
else:
return self.get_empty_subset(key=key)

def _compute_run_in_progress_check_subset(
async def _compute_run_in_progress_check_subset(
self, key: AssetCheckKey
) -> EntitySubset[AssetCheckKey]:
from dagster._core.storage.asset_check_execution_record import (
AssetCheckExecutionResolvedStatus,
)

return self.compute_subset_with_status(key, AssetCheckExecutionResolvedStatus.IN_PROGRESS)
return await self.compute_subset_with_status(
key, AssetCheckExecutionResolvedStatus.IN_PROGRESS
)

def _compute_execution_failed_check_subset(
async def _compute_execution_failed_check_subset(
self, key: AssetCheckKey
) -> EntitySubset[AssetCheckKey]:
from dagster._core.storage.asset_check_execution_record import (
AssetCheckExecutionResolvedStatus,
)

return self.compute_subset_with_status(
return await self.compute_subset_with_status(
key, AssetCheckExecutionResolvedStatus.EXECUTION_FAILED
)

def _compute_missing_check_subset(self, key: AssetCheckKey) -> EntitySubset[AssetCheckKey]:
return self.compute_subset_with_status(key, None)
async def _compute_missing_check_subset(
self, key: AssetCheckKey
) -> EntitySubset[AssetCheckKey]:
return await self.compute_subset_with_status(key, None)

async def _compute_run_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]:
from dagster._core.storage.partition_status_cache import AssetStatusCacheValue
Expand All @@ -411,7 +415,9 @@ async def _compute_run_in_progress_asset_subset(self, key: AssetKey) -> EntitySu
value = self._queryer.get_in_progress_asset_subset(asset_key=key).value
return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value))

def _compute_backfill_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]:
async def _compute_backfill_in_progress_asset_subset(
self, key: AssetKey
) -> EntitySubset[AssetKey]:
value = (
self._queryer.get_active_backfill_in_progress_asset_graph_subset()
.get_asset_subset(asset_key=key, asset_graph=self.asset_graph)
Expand Down Expand Up @@ -473,19 +479,22 @@ async def _compute_missing_asset_subset(
)

@cached_method
def compute_run_in_progress_subset(self, *, key: EntityKey) -> EntitySubset:
return _dispatch(
async def compute_run_in_progress_subset(self, *, key: EntityKey) -> EntitySubset:
return await _dispatch(
key=key,
check_method=self._compute_run_in_progress_check_subset,
asset_method=self._compute_run_in_progress_asset_subset,
)

@cached_method
def compute_backfill_in_progress_subset(self, *, key: EntityKey) -> EntitySubset:
return _dispatch(
async def compute_backfill_in_progress_subset(self, *, key: EntityKey) -> EntitySubset:
async def get_empty_subset(key: EntityKey) -> EntitySubset:
return self.get_empty_subset(key=key)

return await _dispatch(
key=key,
# asset checks cannot currently be backfilled
check_method=lambda k: self.get_empty_subset(key=k),
check_method=get_empty_subset,
asset_method=self._compute_backfill_in_progress_asset_subset,
)

Expand All @@ -509,22 +518,22 @@ async def compute_missing_subset(
),
)

def _compute_updated_since_cursor_subset(
async def _compute_updated_since_cursor_subset(
self, key: AssetKey, cursor: Optional[int]
) -> EntitySubset[AssetKey]:
value = self._queryer.get_asset_subset_updated_after_cursor(
asset_key=key, after_cursor=cursor
).value
return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value))

def _compute_updated_since_time_subset(
async def _compute_updated_since_time_subset(
self, key: AssetCheckKey, time: datetime
) -> EntitySubset[AssetCheckKey]:
from dagster._core.events import DagsterEventType
from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecord

# intentionally left unimplemented for AssetKey, as this is a less performant query
record = AssetCheckExecutionRecord.blocking_get(self, key)
record = await AssetCheckExecutionRecord.gen(self, key)
if (
record is None
or record.event is None
Expand All @@ -536,10 +545,10 @@ def _compute_updated_since_time_subset(
return self.get_full_subset(key=key)

@cached_method
def compute_updated_since_temporal_context_subset(
async def compute_updated_since_temporal_context_subset(
self, *, key: EntityKey, temporal_context: TemporalContext
) -> EntitySubset:
return _dispatch(
return await _dispatch(
key=key,
check_method=functools.partial(
self._compute_updated_since_time_subset, time=temporal_context.effective_dt
Expand Down Expand Up @@ -627,11 +636,11 @@ def _build_multi_partition_subset(
async def _dispatch(
*,
key: EntityKey,
check_method: Callable[[AssetCheckKey], O_Dispatch],
check_method: Callable[[AssetCheckKey], Awaitable[O_Dispatch]],
asset_method: Callable[[AssetKey], Awaitable[O_Dispatch]],
) -> O_Dispatch:
"""Applies a method for either a check or an asset."""
if isinstance(key, AssetCheckKey):
return check_method(key)
return await check_method(key)
else:
return await asset_method(key)
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ class RunInProgressAutomationCondition(SubsetAutomationCondition):
def name(self) -> str:
return "execution_in_progress"

def compute_subset(self, context: AutomationContext) -> EntitySubset:
return context.asset_graph_view.compute_run_in_progress_subset(key=context.key)
async def compute_subset(self, context: AutomationContext) -> EntitySubset:
return await context.asset_graph_view.compute_run_in_progress_subset(key=context.key)


@whitelist_for_serdes
Expand All @@ -84,8 +84,8 @@ class BackfillInProgressAutomationCondition(SubsetAutomationCondition):
def name(self) -> str:
return "backfill_in_progress"

def compute_subset(self, context: AutomationContext) -> EntitySubset:
return context.asset_graph_view.compute_backfill_in_progress_subset(key=context.key)
async def compute_subset(self, context: AutomationContext) -> EntitySubset:
return await context.asset_graph_view.compute_backfill_in_progress_subset(key=context.key)


@whitelist_for_serdes(storage_name="FailedAutomationCondition")
Expand Down Expand Up @@ -147,11 +147,11 @@ class NewlyUpdatedCondition(SubsetAutomationCondition):
def name(self) -> str:
return "newly_updated"

def compute_subset(self, context: AutomationContext) -> EntitySubset:
async def compute_subset(self, context: AutomationContext) -> EntitySubset:
# if it's the first time evaluating, just return the empty subset
if context.previous_temporal_context is None:
return context.get_empty_subset()
return context.asset_graph_view.compute_updated_since_temporal_context_subset(
return await context.asset_graph_view.compute_updated_since_temporal_context_subset(
key=context.key, temporal_context=context.previous_temporal_context
)

Expand Down Expand Up @@ -240,7 +240,7 @@ class CheckResultCondition(SubsetAutomationCondition[AssetCheckKey]):
def name(self) -> str:
return "check_passed" if self.passed else "check_failed"

def compute_subset(
async def compute_subset(
self, context: AutomationContext[AssetCheckKey]
) -> EntitySubset[AssetCheckKey]:
from dagster._core.storage.asset_check_execution_record import (
Expand All @@ -252,6 +252,6 @@ def compute_subset(
if self.passed
else AssetCheckExecutionResolvedStatus.FAILED
)
return context.asset_graph_view.compute_subset_with_status(
return await context.asset_graph_view.compute_subset_with_status(
key=context.key, status=target_status
)

0 comments on commit 20e09b1

Please sign in to comment.