Skip to content

Commit

Permalink
Store allow_backfills in the sensor metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Oct 11, 2024
1 parent 55db5d0 commit 08182f2
Show file tree
Hide file tree
Showing 15 changed files with 61 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def test_auto_materialize_perf(scenario: PerfScenario):
entity_keys=AssetSelection.all().resolve(asset_graph),
instance=instance,
asset_graph=asset_graph,
allow_backfills=False,
emit_backfills=False,
cursor=AssetDaemonCursor.empty(),
).evaluate()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from dagster._core.definitions.utils import check_valid_name
from dagster._utils.tags import normalize_tags

EMIT_BACKFILLS_METADATA_KEY = "dagster/emit_backfills"


def _evaluate(sensor_def: "AutomationConditionSensorDefinition", context: SensorEvaluationContext):
from dagster._core.definitions.automation_tick_evaluation_context import (
Expand All @@ -41,7 +43,7 @@ def _evaluate(sensor_def: "AutomationConditionSensorDefinition", context: Sensor
observe_run_tags={},
auto_observe_asset_keys=set(),
asset_selection=sensor_def.asset_selection,
allow_backfills=sensor_def.allow_backfills,
emit_backfills=sensor_def.emit_backfills,
default_condition=sensor_def.default_condition,
logger=context.log,
).evaluate()
Expand Down Expand Up @@ -79,6 +81,8 @@ class AutomationConditionSensorDefinition(SensorDefinition):
sensor. The actual interval will be longer if the sensor evaluation takes longer than
the provided interval.
description (Optional[str]): A human-readable description of the sensor.
emit_backfills (bool): If set to True, will emit a backfill on any tick where more than one partition
of any single asset is requested, rather than individual runs. Defaults to False.
"""

def __init__(
Expand All @@ -92,17 +96,12 @@ def __init__(
minimum_interval_seconds: Optional[int] = None,
description: Optional[str] = None,
metadata: Optional[Mapping[str, object]] = None,
emit_backfills: bool = False,
**kwargs,
):
self._user_code = kwargs.get("user_code", False)
self._allow_backfills = check.opt_bool_param(
kwargs.get("allow_backfills"), "allow_backfills", default=False
)
check.param_invariant(
not (self._allow_backfills and not self._user_code),
"allow_backfills",
"Setting `allow_backfills` for a non-user-code AutomationConditionSensorDefinition is not supported.",
)
check.bool_param(emit_backfills, "emit_backfills")

self._default_condition = check.opt_inst_param(
kwargs.get("default_condition"), "default_condition", AutomationCondition
)
Expand All @@ -114,6 +113,10 @@ def __init__(

self._run_tags = normalize_tags(run_tags)

# only store this value in the metadata if it's True
if emit_backfills:
metadata = {**(metadata or {}), EMIT_BACKFILLS_METADATA_KEY: True}

super().__init__(
name=check_valid_name(name),
job_name=None,
Expand All @@ -138,8 +141,8 @@ def asset_selection(self) -> AssetSelection:
return cast(AssetSelection, super().asset_selection)

@property
def allow_backfills(self) -> bool:
return self._allow_backfills
def emit_backfills(self) -> bool:
return EMIT_BACKFILLS_METADATA_KEY in self.metadata

@property
def default_condition(self) -> Optional[AutomationCondition]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def __init__(
auto_observe_asset_keys: AbstractSet[AssetKey],
asset_selection: AssetSelection,
logger: logging.Logger,
allow_backfills: bool,
emit_backfills: bool,
default_condition: Optional[AutomationCondition] = None,
evaluation_time: Optional[datetime.datetime] = None,
):
Expand All @@ -78,7 +78,7 @@ def __init__(
default_condition=default_condition,
instance=instance,
asset_graph=asset_graph,
allow_backfills=allow_backfills,
emit_backfills=emit_backfills,
cursor=cursor,
evaluation_time=evaluation_time,
logger=logger,
Expand Down Expand Up @@ -137,7 +137,7 @@ def _build_run_requests(self, entity_subsets: Sequence[EntitySubset]) -> Sequenc
entity_subsets=entity_subsets,
asset_graph=self.asset_graph,
run_tags=self._materialize_run_tags,
allow_backfills=self._evaluator.allow_backfills,
emit_backfills=self._evaluator.emit_backfills,
)

def _get_updated_cursor(
Expand Down Expand Up @@ -304,9 +304,9 @@ def build_run_requests(
entity_subsets: Sequence[EntitySubset],
asset_graph: BaseAssetGraph,
run_tags: Optional[Mapping[str, str]],
allow_backfills: bool,
emit_backfills: bool,
) -> Sequence[RunRequest]:
if allow_backfills:
if emit_backfills:
backfill_run_request, entity_subsets = _build_backfill_request(
entity_subsets, asset_graph, run_tags
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __init__(
instance: DagsterInstance,
asset_graph: BaseAssetGraph,
cursor: AssetDaemonCursor,
allow_backfills: bool,
emit_backfills: bool,
default_condition: Optional[AutomationCondition] = None,
evaluation_time: Optional[datetime.datetime] = None,
logger: logging.Logger = logging.getLogger("dagster.automation"),
Expand Down Expand Up @@ -59,7 +59,7 @@ def __init__(
self.legacy_respect_materialization_data_versions = (
_instance.auto_materialize_respect_materialization_data_versions
)
self.allow_backfills = allow_backfills or _instance.da_request_backfills()
self.emit_backfills = emit_backfills or _instance.da_request_backfills()

self.legacy_expected_data_time_by_key: Dict[AssetKey, Optional[datetime.datetime]] = {}
self.legacy_data_time_resolver = CachingDataTimeResolver(self.instance_queryer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
if asset_graph.get(key).automation_condition is not None
},
evaluation_time=evaluation_time,
allow_backfills=False,
emit_backfills=False,
logger=logging.getLogger("dagster.automation_condition_tester"),
# round-trip the provided cursor to simulate actual usage
cursor=deserialize_value(serialize_value(cursor), AssetDaemonCursor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def create(key: EntityKey, evaluator: "AutomationConditionEvaluator") -> "Automa
)
condition_unqiue_id = condition.get_node_unique_id(parent_unique_id=None, index=None)

if condition.has_rule_condition and evaluator.allow_backfills:
if condition.has_rule_condition and evaluator.emit_backfills:
raise DagsterInvalidDefinitionError(
"Cannot use AutoMaterializePolicies and request backfills. Please use AutomationCondition or set DECLARATIVE_AUTOMATION_REQUEST_BACKFILLS to False."
)
Expand Down
11 changes: 10 additions & 1 deletion python_modules/dagster/dagster/_daemon/asset_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
)
from dagster._core.definitions.asset_key import AssetCheckKey, EntityKey
from dagster._core.definitions.asset_selection import AssetSelection
from dagster._core.definitions.automation_condition_sensor_definition import (
EMIT_BACKFILLS_METADATA_KEY,
)
from dagster._core.definitions.automation_tick_evaluation_context import (
AutomationTickEvaluationContext,
)
Expand Down Expand Up @@ -940,6 +943,7 @@ def _evaluate_auto_materialize_tick(
).without_checks() | AssetSelection.checks(
*{key for key in auto_materialize_entity_keys if isinstance(key, AssetCheckKey)}
)

run_requests, new_cursor, evaluations = AutomationTickEvaluationContext(
evaluation_id=evaluation_id,
asset_graph=asset_graph,
Expand All @@ -954,7 +958,12 @@ def _evaluate_auto_materialize_tick(
**sensor_tags,
},
observe_run_tags={AUTO_OBSERVE_TAG: "true", **sensor_tags},
allow_backfills=False,
emit_backfills=bool(
sensor
and sensor.metadata
and sensor.metadata.standard_metadata
and EMIT_BACKFILLS_METADATA_KEY in sensor.metadata.standard_metadata
),
auto_observe_asset_keys=auto_observe_asset_keys,
logger=self._logger,
).evaluate()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import dagster as dg


def get_defs() -> dg.Definitions:
from .backfill_simple_user_code import defs as uc_defs # noqa

return dg.Definitions(
assets=uc_defs.assets,
sensors=[
dg.AutomationConditionSensorDefinition(
name="the_sensor", asset_selection="*", emit_backfills=True
)
],
)


defs = get_defs()
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def E() -> None: ...
assets=[A, B, C, D, E],
sensors=[
dg.AutomationConditionSensorDefinition(
"the_sensor", asset_selection="*", user_code=True, allow_backfills=True
"the_sensor", asset_selection="*", user_code=True, emit_backfills=True
)
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def outside2() -> dg.AssetCheckResult: ...
asset_checks=[outsideA, outsideB, outside1, outside2],
sensors=[
dg.AutomationConditionSensorDefinition(
"the_sensor", asset_selection="*", user_code=True, allow_backfills=True
"the_sensor", asset_selection="*", user_code=True, emit_backfills=True
)
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import AbstractSet, Mapping, Sequence, cast

import dagster._check as check
import pytest
from dagster import AssetMaterialization, RunsFilter, instance_for_test
from dagster._core.asset_graph_view.serializable_entity_subset import SerializableEntitySubset
from dagster._core.definitions.asset_daemon_cursor import AssetDaemonCursor
Expand Down Expand Up @@ -418,9 +419,10 @@ def _get_subsets_by_key(
return {s.key: s for s in target_subset.iterate_asset_subsets(asset_graph)}


def test_backfill_creation_simple() -> None:
@pytest.mark.parametrize("location", ["backfill_simple_user_code", "backfill_simple_non_user_code"])
def test_backfill_creation_simple(location: str) -> None:
with get_workspace_request_context(
["backfill_simple"]
[location]
) as context, get_threadpool_executor() as executor:
asset_graph = context.create_request_context().asset_graph

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def asset1(): ...
instance=instance,
cursor=AssetDaemonCursor.empty(),
materialize_run_tags={},
allow_backfills=False,
emit_backfills=False,
observe_run_tags={"tag1": "tag_value"},
logger=logging.getLogger("dagster.amp"),
).evaluate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def _evaluate_tick_fast(
if self.asset_graph.get(key).auto_observe_interval_minutes is not None
},
logger=self.logger,
allow_backfills=False,
emit_backfills=False,
).evaluate()
check.is_list(new_run_requests, of_type=RunRequest)
check.inst(new_cursor, AssetDaemonCursor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def evaluate(
0, 0, [], [self.condition_cursor] if self.condition_cursor else []
),
logger=self.logger,
allow_backfills=False,
emit_backfills=False,
)
evaluator.request_subsets_by_key = self._get_request_subsets_by_key(
evaluator.asset_graph_view
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ def test_time_fn():
materialize_run_tags={},
observe_run_tags={},
cursor=cursor,
allow_backfills=False,
emit_backfills=False,
auto_observe_asset_keys={
key
for key in asset_graph.observable_asset_keys
Expand Down

0 comments on commit 08182f2

Please sign in to comment.