Skip to content

Commit

Permalink
Create backfill_in_progress AutomationCondition
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Oct 11, 2024
1 parent 9d0fd47 commit e6aa284
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ def A() -> None: ...
assert record["numRequested"] == 0

# all nodes in the tree
assert len(record["evaluationNodes"]) == 28
assert len(record["evaluationNodes"]) == 32

rootNode = record["evaluationNodes"][0]
assert rootNode["uniqueId"] == record["rootUniqueId"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,9 @@ def compute_subset_with_status(
else:
return self.get_empty_subset(key=key)

def _compute_in_progress_check_subset(self, key: AssetCheckKey) -> EntitySubset[AssetCheckKey]:
def _compute_run_in_progress_check_subset(
self, key: AssetCheckKey
) -> EntitySubset[AssetCheckKey]:
from dagster._core.storage.asset_check_execution_record import (
AssetCheckExecutionResolvedStatus,
)
Expand All @@ -384,10 +386,18 @@ def _compute_execution_failed_check_subset(
def _compute_missing_check_subset(self, key: AssetCheckKey) -> EntitySubset[AssetCheckKey]:
return self.compute_subset_with_status(key, None)

def _compute_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]:
def _compute_run_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]:
value = self._queryer.get_in_progress_asset_subset(asset_key=key).value
return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value))

def _compute_backfill_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]:
value = (
self._queryer.get_active_backfill_in_progress_asset_graph_subset()
.get_asset_subset(asset_key=key, asset_graph=self.asset_graph)
.value
)
return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value))

def _compute_execution_failed_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]:
value = self._queryer.get_failed_asset_subset(asset_key=key).value
return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value))
Expand Down Expand Up @@ -421,11 +431,20 @@ def _compute_missing_asset_subset(
)

@cached_method
def compute_in_progress_subset(self, *, key: EntityKey) -> EntitySubset:
def compute_run_in_progress_subset(self, *, key: EntityKey) -> EntitySubset:
return _dispatch(
key=key,
check_method=self._compute_run_in_progress_check_subset,
asset_method=self._compute_run_in_progress_asset_subset,
)

@cached_method
def compute_backfill_in_progress_subset(self, *, key: EntityKey) -> EntitySubset:
return _dispatch(
key=key,
check_method=self._compute_in_progress_check_subset,
asset_method=self._compute_in_progress_asset_subset,
# asset checks cannot currently be backfilled
check_method=lambda k: self.get_empty_subset(key=k),
asset_method=self._compute_backfill_in_progress_asset_subset,
)

@cached_method
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from dagster._core.definitions.declarative_automation.operands import (
BackfillInProgressAutomationCondition as BackfillInProgressAutomationCondition,
CronTickPassedCondition as CronTickPassedCondition,
ExecutionFailedAutomationCondition as ExecutionFailedAutomationCondition,
InLatestTimeWindowCondition as InLatestTimeWindowCondition,
InProgressAutomationCondition as InProgressAutomationCondition,
MissingAutomationCondition as MissingAutomationCondition,
NewlyRequestedCondition as NewlyRequestedCondition,
NewlyUpdatedCondition as NewlyUpdatedCondition,
RunInProgressAutomationCondition as RunInProgressAutomationCondition,
WillBeRequestedCondition as WillBeRequestedCondition,
)
from dagster._core.definitions.declarative_automation.operands.operands import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,13 +325,24 @@ def missing() -> "BuiltinAutomationCondition":
@public
@experimental
@staticmethod
def in_progress() -> "BuiltinAutomationCondition":
def run_in_progress() -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true if the target is part of an in-progress run."""
from dagster._core.definitions.declarative_automation.operands import (
InProgressAutomationCondition,
RunInProgressAutomationCondition,
)

return RunInProgressAutomationCondition()

@public
@experimental
@staticmethod
def backfill_in_progress() -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true if the target is part of an in-progress backfill."""
from dagster._core.definitions.declarative_automation.operands import (
BackfillInProgressAutomationCondition,
)

return InProgressAutomationCondition()
return BackfillInProgressAutomationCondition()

@public
@experimental
Expand All @@ -344,6 +355,17 @@ def execution_failed() -> "BuiltinAutomationCondition":

return ExecutionFailedAutomationCondition()

@public
@experimental
@staticmethod
def in_progress() -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true for an asset partition if it is part of an
in-progress run or backfill.
"""
return (
AutomationCondition.run_in_progress() | AutomationCondition.backfill_in_progress()
).with_label("in_progress")

@public
@experimental
@staticmethod
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from dagster._core.definitions.declarative_automation.operands.operands import (
BackfillInProgressAutomationCondition as BackfillInProgressAutomationCondition,
CheckResultCondition as CheckResultCondition,
CodeVersionChangedCondition as CodeVersionChangedCondition,
CronTickPassedCondition as CronTickPassedCondition,
ExecutionFailedAutomationCondition as ExecutionFailedAutomationCondition,
InitialEvaluationCondition as InitialEvaluationCondition,
InLatestTimeWindowCondition as InLatestTimeWindowCondition,
InProgressAutomationCondition as InProgressAutomationCondition,
MissingAutomationCondition as MissingAutomationCondition,
NewlyRequestedCondition as NewlyRequestedCondition,
NewlyUpdatedCondition as NewlyUpdatedCondition,
RunInProgressAutomationCondition as RunInProgressAutomationCondition,
WillBeRequestedCondition as WillBeRequestedCondition,
)
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,26 @@ def compute_subset(self, context: AutomationContext) -> EntitySubset:
)


@whitelist_for_serdes(storage_name="InProgressAutomationCondition")
@record
class RunInProgressAutomationCondition(SubsetAutomationCondition):
@property
def name(self) -> str:
return "execution_in_progress"

def compute_subset(self, context: AutomationContext) -> EntitySubset:
return context.asset_graph_view.compute_run_in_progress_subset(key=context.key)


@whitelist_for_serdes
@record
class InProgressAutomationCondition(SubsetAutomationCondition):
class BackfillInProgressAutomationCondition(SubsetAutomationCondition):
@property
def name(self) -> str:
return "in_progress"
return "backfill_in_progress"

def compute_subset(self, context: AutomationContext) -> EntitySubset:
return context.asset_graph_view.compute_in_progress_subset(key=context.key)
return context.asset_graph_view.compute_backfill_in_progress_subset(key=context.key)


@whitelist_for_serdes(storage_name="FailedAutomationCondition")
Expand Down
40 changes: 30 additions & 10 deletions python_modules/dagster/dagster/_utils/caching_instance_queryer.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from dagster._utils.cached_method import cached_method

if TYPE_CHECKING:
from dagster._core.execution.asset_backfill import AssetBackfillData
from dagster._core.storage.event_log import EventLogRecord
from dagster._core.storage.event_log.base import AssetRecord
from dagster._core.storage.partition_status_cache import AssetStatusCacheValue
Expand Down Expand Up @@ -537,33 +538,52 @@ def get_current_materializations_for_run(self, *, run_id: str) -> AbstractSet[As
####################

@cached_method
def get_active_backfill_target_asset_graph_subset(self) -> AssetGraphSubset:
"""Returns an AssetGraphSubset representing the set of assets that are currently targeted by
an active asset backfill.
"""
def get_active_backfill_datas(self) -> Sequence["AssetBackfillData"]:
from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus

asset_backfills = [
active_backfills = [
backfill
for backfill in self.instance.get_backfills(
filters=BulkActionsFilter(statuses=[BulkActionStatus.REQUESTED])
)
if backfill.is_asset_backfill
]

result = AssetGraphSubset()
for asset_backfill in asset_backfills:
backfill_datas = []
for backfill in active_backfills:
try:
asset_backfill_data = asset_backfill.get_asset_backfill_data(self.asset_graph)
backfill_datas.append(backfill.get_asset_backfill_data(self.asset_graph))
except DagsterDefinitionChangedDeserializationError:
self._logger.warning(
f"Not considering assets in backfill {asset_backfill.backfill_id} since its"
f"Not considering assets in backfill {backfill.backfill_id} since its"
" data could not be deserialized"
)
# Backfill can't be loaded, so no risk of the assets interfering
continue
return backfill_datas

result |= asset_backfill_data.target_subset
@cached_method
def get_active_backfill_target_asset_graph_subset(self) -> AssetGraphSubset:
"""Returns an AssetGraphSubset representing the set of assets that are currently targeted by
an active asset backfill.
"""
result = AssetGraphSubset()
for data in self.get_active_backfill_datas():
result |= data.target_subset

return result

@cached_method
def get_active_backfill_in_progress_asset_graph_subset(self) -> AssetGraphSubset:
"""Returns an AssetGraphSubset representing the set of assets that are currently targeted by
an active asset backfill and have not yet been materialized or failed.
"""
result = AssetGraphSubset()
for data in self.get_active_backfill_datas():
in_progress_subset = (
data.target_subset - data.materialized_subset - data.failed_and_downstream_subset
)
result |= in_progress_subset

return result

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@
("dd74c7cfe19d869931ea4aad9ee10127", SC.on_cron("0 * * * *"), two_parents, False),
("861f8e40d4624d49c4ebdd034c8e1e84", SC.on_cron("0 * * * *"), two_parents_daily, False),
# same as above
("b5cb0d7a1c627bd2c9e7c6da3313ab71", SC.eager(), one_parent, False),
("7802a65024d04bbe44a3e0e541c0a577", SC.eager(), one_parent, True),
("5d9c70da7ecca9e40f32c1ad99956b5d", SC.eager(), one_parent_daily, False),
("904bac575906542d28b9e069129dad37", SC.eager(), two_parents, False),
("3ef1d373a2b38752ad8e23fe9c053d9f", SC.eager(), two_parents_daily, False),
("dfb268e321e2e7aa7b0e2e71fa674e06", SC.eager(), one_parent, False),
("781252e1a53db1ecd5938b0da61dba0b", SC.eager(), one_parent, True),
("293186887409aac2fe99b09bd633c64b", SC.eager(), one_parent_daily, False),
("c92d9d5181b4d0a6c7ab5d1c6e26962a", SC.eager(), two_parents, False),
("911bcc4f8904ec6dae85f6aaf78f5ee5", SC.eager(), two_parents_daily, False),
# missing condition is invariant to changes other than partitions def changes
("6d7809c4949e3d812d7eddfb1b60d529", SC.missing(), one_parent, False),
("6d7809c4949e3d812d7eddfb1b60d529", SC.missing(), one_parent, True),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from typing import AbstractSet, Mapping, Sequence, cast

import dagster._check as check
import pytest
from dagster import AssetMaterialization, RunsFilter, instance_for_test
from dagster._core.asset_graph_view.serializable_entity_subset import SerializableEntitySubset
from dagster._core.definitions.asset_daemon_cursor import AssetDaemonCursor
Expand Down Expand Up @@ -419,7 +418,6 @@ def _get_subsets_by_key(
return {s.key: s for s in target_subset.iterate_asset_subsets(asset_graph)}


@pytest.mark.skip("Pending change to in_progress() behavior")
def test_backfill_creation_simple() -> None:
with get_workspace_request_context(
["backfill_simple"]
Expand Down Expand Up @@ -462,7 +460,6 @@ def test_backfill_creation_simple() -> None:
assert len(runs) == 0


@pytest.mark.skip("Pending change to in_progress() behavior")
def test_backfill_with_runs_and_checks() -> None:
with get_workspace_request_context(
["backfill_with_runs_and_checks"]
Expand Down

0 comments on commit e6aa284

Please sign in to comment.