Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add launcher to Graphene types for run and backfill #25236

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 45 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 56 additions & 0 deletions python_modules/dagster-graphql/dagster_graphql/schema/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
from dagster._core.storage.tags import (
ASSET_PARTITION_RANGE_END_TAG,
ASSET_PARTITION_RANGE_START_TAG,
AUTO_MATERIALIZE_TAG,
AUTO_OBSERVE_TAG,
SCHEDULE_NAME_TAG,
SENSOR_NAME_TAG,
USER_TAG,
TagType,
get_tag_type,
)
Expand All @@ -47,6 +52,7 @@
GrapheneUnauthorizedError,
create_execution_params_error_types,
)
from dagster_graphql.schema.launched_by import GrapheneLaunchedBy
from dagster_graphql.schema.pipelines.config import GrapheneRunConfigValidationInvalid
from dagster_graphql.schema.pipelines.status import GrapheneRunStatus
from dagster_graphql.schema.runs_feed import GrapheneRunsFeedEntry
Expand Down Expand Up @@ -385,6 +391,7 @@ class Meta:
assetCheckSelection = graphene.List(
graphene.NonNull("dagster_graphql.schema.asset_checks.GrapheneAssetCheckHandle")
)
launchedBy = graphene.NonNull(GrapheneLaunchedBy)

def __init__(self, backfill_job: PartitionBackfill):
self._backfill_job = check.inst_param(backfill_job, "backfill_job", PartitionBackfill)
Expand Down Expand Up @@ -520,6 +527,55 @@ def resolve_tags(self, _graphene_info: ResolveInfo):
if get_tag_type(key) != TagType.HIDDEN
]

def resolve_launchedBy(self, _graphene_info: ResolveInfo):
"""Determines which value should be shown in the Launched by column in the UI. This value is
deetermined based on the tags of the backfill, not the source of the backfill as stored in the DB. Thus, some
backfills may have different launchedBy values from source columns.
"""
from dagster_graphql.schema.tags import GraphenePipelineTag

if self._backfill_job.tags.get(USER_TAG):
return GrapheneLaunchedBy(
kind="user",
tag=GraphenePipelineTag(key=USER_TAG, value=self._backfill_job.tags[USER_TAG]),
)
if self._backfill_job.tags.get(SCHEDULE_NAME_TAG):
return GrapheneLaunchedBy(
kind="schedule",
tag=GraphenePipelineTag(
key=SCHEDULE_NAME_TAG, value=self._backfill_job.tags[SCHEDULE_NAME_TAG]
),
)
if self._backfill_job.tags.get(SENSOR_NAME_TAG):
return GrapheneLaunchedBy(
kind="sensor",
tag=GraphenePipelineTag(
key=SENSOR_NAME_TAG, value=self._backfill_job.tags[SENSOR_NAME_TAG]
),
)
if self._backfill_job.tags.get(AUTO_MATERIALIZE_TAG):
return GrapheneLaunchedBy(
kind="auto-materialize",
tag=GraphenePipelineTag(
key=AUTO_MATERIALIZE_TAG, value=self._backfill_job.tags[AUTO_MATERIALIZE_TAG]
),
)
if self._backfill_job.tags.get("dagster/created_by") == "auto_materialize":
return GrapheneLaunchedBy(
kind="auto-materialize",
tag=GraphenePipelineTag(
key="dagster/created_by", value=self._backfill_job.tags["dagster/created_by"]
),
)
if self._backfill_job.tags.get(AUTO_OBSERVE_TAG):
return GrapheneLaunchedBy(
kind="auto-observe",
tag=GraphenePipelineTag(
key=AUTO_OBSERVE_TAG, value=self._backfill_job.tags[AUTO_OBSERVE_TAG]
),
)
return GrapheneLaunchedBy(kind="manual", tag=GraphenePipelineTag(key="", value=""))

def resolve_runStatus(self, _graphene_info: ResolveInfo) -> GrapheneRunStatus:
return GrapheneBulkActionStatus(self.status).to_dagster_run_status()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import graphene

from dagster_graphql.schema.tags import GraphenePipelineTag


class GrapheneLaunchedBy(graphene.ObjectType):
kind = graphene.NonNull(graphene.String)
tag = graphene.NonNull(GraphenePipelineTag)

class Meta:
name = "LaunchedBy"

def __init__(self, kind: str, tag):
self.kind = kind
self.tag = tag
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,17 @@
RunRecord,
RunsFilter,
)
from dagster._core.storage.tags import REPOSITORY_LABEL_TAG, RUN_METRIC_TAGS, TagType, get_tag_type
from dagster._core.storage.tags import (
AUTO_MATERIALIZE_TAG,
AUTO_OBSERVE_TAG,
REPOSITORY_LABEL_TAG,
RUN_METRIC_TAGS,
SCHEDULE_NAME_TAG,
SENSOR_NAME_TAG,
USER_TAG,
TagType,
get_tag_type,
)
from dagster._core.workspace.permissions import Permissions
from dagster._utils.tags import get_boolean_tag_value
from dagster._utils.yaml_utils import dump_run_config_yaml
Expand Down Expand Up @@ -51,6 +61,7 @@
)
from dagster_graphql.schema.execution import GrapheneExecutionPlan
from dagster_graphql.schema.inputs import GrapheneAssetKeyInput
from dagster_graphql.schema.launched_by import GrapheneLaunchedBy
from dagster_graphql.schema.logs.compute_logs import GrapheneCapturedLogs, from_captured_log_data
from dagster_graphql.schema.logs.events import (
GrapheneDagsterRunEvent,
Expand Down Expand Up @@ -391,6 +402,7 @@ class GrapheneRun(graphene.ObjectType):
rootConcurrencyKeys = graphene.List(graphene.NonNull(graphene.String))
hasUnconstrainedRootNodes = graphene.NonNull(graphene.Boolean)
hasRunMetricsEnabled = graphene.NonNull(graphene.Boolean)
launchedBy = graphene.NonNull(GrapheneLaunchedBy)

class Meta:
interfaces = (GraphenePipelineRun, GrapheneRunsFeedEntry)
Expand Down Expand Up @@ -529,6 +541,53 @@ def resolve_tags(self, _graphene_info: ResolveInfo):
if get_tag_type(key) != TagType.HIDDEN
]

def resolve_launchedBy(self, _graphene_info: ResolveInfo):
"""Determines which value should be shown in the Launched by column in the UI. This value is
deetermined based on the tags of the run, not the source of the run as stored in the DB. Thus, some
runs may have different launchedBy values from source columns.
"""
if self.dagster_run.tags.get(USER_TAG):
return GrapheneLaunchedBy(
kind="user",
tag=GraphenePipelineTag(key=USER_TAG, value=self.dagster_run.tags[USER_TAG]),
)
if self.dagster_run.tags.get(SCHEDULE_NAME_TAG):
return GrapheneLaunchedBy(
kind="schedule",
tag=GraphenePipelineTag(
key=SCHEDULE_NAME_TAG, value=self.dagster_run.tags[SCHEDULE_NAME_TAG]
),
)
if self.dagster_run.tags.get(SENSOR_NAME_TAG):
return GrapheneLaunchedBy(
kind="sensor",
tag=GraphenePipelineTag(
key=SENSOR_NAME_TAG, value=self.dagster_run.tags[SENSOR_NAME_TAG]
),
)
if self.dagster_run.tags.get(AUTO_MATERIALIZE_TAG):
return GrapheneLaunchedBy(
kind="auto-materialize",
tag=GraphenePipelineTag(
key=AUTO_MATERIALIZE_TAG, value=self.dagster_run.tags[AUTO_MATERIALIZE_TAG]
),
)
if self.dagster_run.tags.get("dagster/created_by") == "auto_materialize":
return GrapheneLaunchedBy(
kind="auto-materialize",
tag=GraphenePipelineTag(
key="dagster/created_by", value=self.dagster_run.tags["dagster/created_by"]
),
)
if self.dagster_run.tags.get(AUTO_OBSERVE_TAG):
return GrapheneLaunchedBy(
kind="auto-observe",
tag=GraphenePipelineTag(
key=AUTO_OBSERVE_TAG, value=self.dagster_run.tags[AUTO_OBSERVE_TAG]
),
)
return GrapheneLaunchedBy(kind="manual", tag=GraphenePipelineTag(key="", value=""))

def resolve_rootRunId(self, _graphene_info: ResolveInfo):
return self.dagster_run.root_run_id

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from dagster_graphql.schema.asset_key import GrapheneAssetKey
from dagster_graphql.schema.errors import GraphenePythonError
from dagster_graphql.schema.launched_by import GrapheneLaunchedBy
from dagster_graphql.schema.util import non_null_list


Expand All @@ -17,6 +18,7 @@ class GrapheneRunsFeedEntry(graphene.Interface):
assetCheckSelection = graphene.List(
graphene.NonNull("dagster_graphql.schema.asset_checks.GrapheneAssetCheckHandle")
)
launchedBy = graphene.NonNull(GrapheneLaunchedBy)

class Meta:
name = "RunsFeedEntry"
Expand Down