Skip to content

Commit

Permalink
random other things
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart authored and briantu committed Oct 17, 2024
1 parent d3710cb commit 4ccd03c
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 9 deletions.
1 change: 1 addition & 0 deletions profile.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from dataclasses import dataclass, replace
from typing import Generic, Optional, Union

import dagster._check as check
Expand All @@ -10,12 +9,13 @@
PartitionsSubset,
)
from dagster._core.definitions.time_window_partitions import BaseTimeWindowPartitionsSubset
from dagster._serdes.serdes import DataclassSerializer, whitelist_for_serdes
from dagster._record import record, replace
from dagster._serdes.serdes import NamedTupleSerializer, whitelist_for_serdes

EntitySubsetValue = Union[bool, PartitionsSubset]


class EntitySubsetSerializer(DataclassSerializer):
class EntitySubsetSerializer(NamedTupleSerializer):
"""Ensures that the inner PartitionsSubset is converted to a serializable form if necessary."""

def get_storage_name(self) -> str:
Expand All @@ -33,7 +33,7 @@ def before_pack(self, value: "SerializableEntitySubset") -> "SerializableEntityS
storage_field_names={"key": "asset_key"},
old_storage_names={"AssetSubset"},
)
@dataclass(frozen=True)
@record(checked=False)
class SerializableEntitySubset(Generic[T_EntityKey]):
"""Represents a serializable subset of a given EntityKey."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
get_serializable_candidate_subset,
)
from dagster._core.definitions.partition import AllPartitionsSubset
from dagster._core.definitions.time_window_partitions import BaseTimeWindowPartitionsSubset
from dagster._core.definitions.time_window_partitions import TimeWindowPartitionsSubset
from dagster._record import copy, record
from dagster._serdes.serdes import is_whitelisted_for_serdes_object
from dagster._time import get_current_timestamp
Expand Down Expand Up @@ -753,7 +753,7 @@ def node_cursor(self) -> Optional[AutomationConditionNodeCursor]:
extra_state=self._extra_state,
)

@cached_property
@property
def serializable_evaluation(self) -> AutomationConditionEvaluation:
return AutomationConditionEvaluation(
condition_snapshot=self.condition.get_node_snapshot(self.condition_unique_id),
Expand Down Expand Up @@ -826,7 +826,7 @@ def _compute_subset_value_str(subset: SerializableEntitySubset) -> str:
return str(subset.value)
elif isinstance(subset.value, AllPartitionsSubset):
return AllPartitionsSubset.__name__
elif isinstance(subset.value, BaseTimeWindowPartitionsSubset):
elif isinstance(subset.value, TimeWindowPartitionsSubset):
return str(
[
(tw.start.timestamp(), tw.end.timestamp())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def frozen_metadata(self) -> FrozenSet[Tuple[str, MetadataValue]]:


@whitelist_for_serdes(storage_name="AssetConditionEvaluation")
@dataclass
@record
class AutomationConditionEvaluation(Generic[T_EntityKey]):
"""Serializable representation of the results of evaluating a node in the evaluation tree."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,10 @@ def get_asset_status_cache_values(
values = []
for asset_key, partitions_def in partitions_defs_by_key.items():
values.append(
get_and_update_asset_status_cache_value(self._instance, asset_key, partitions_def)
get_and_update_asset_status_cache_value(
self._instance,
asset_key,
partitions_def, # loading_context=loading_context
)
)
return values

0 comments on commit 4ccd03c

Please sign in to comment.