Skip to content

Commit

Permalink
pseudocode
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Oct 14, 2024
1 parent a0ba58c commit 525a63d
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 3 deletions.
1 change: 1 addition & 0 deletions python_modules/dagster/dagster/_core/execution/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class BulkActionStatus(Enum):
CANCELED = "CANCELED"
COMPLETED_SUCCESS = "COMPLETED_SUCCESS"
COMPLETED_FAILED = "COMPLETED_FAILED" # denotes that the backfill daemon completed successfully, but some runs failed
DELETING = "DELETING"

@staticmethod
def from_graphql_input(graphql_str):
Expand Down
19 changes: 16 additions & 3 deletions python_modules/dagster/dagster/_daemon/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from dagster._core.execution.asset_backfill import execute_asset_backfill_iteration
from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus, PartitionBackfill
from dagster._core.execution.job_backfill import execute_job_backfill_iteration
from dagster._core.storage.dagster_run import RunsFilter
from dagster._core.workspace.context import IWorkspaceProcessContext
from dagster._daemon.utils import DaemonErrorCapture
from dagster._time import get_current_datetime
Expand Down Expand Up @@ -59,13 +60,16 @@ def execute_backfill_iteration(
canceling_backfills = instance.get_backfills(
filters=BulkActionsFilter(statuses=[BulkActionStatus.CANCELING])
)
deleting_backfills = instance.get_backfills(
filters=BulkActionsFilter(statuses=[BulkActionStatus.DELETING])
)

if not in_progress_backfills and not canceling_backfills:
logger.debug("No backfill jobs in progress or canceling.")
if not in_progress_backfills and not canceling_backfills and not deleting_backfills:
logger.debug("No backfill jobs in progress, canceling, or deleting")
yield None
return

backfill_jobs = [*in_progress_backfills, *canceling_backfills]
backfill_jobs = [*in_progress_backfills, *canceling_backfills, *deleting_backfills]

yield from execute_backfill_jobs(
workspace_process_context, logger, backfill_jobs, debug_crash_flags
Expand Down Expand Up @@ -103,6 +107,15 @@ def execute_backfill_jobs(
)

try:
if backfill.status == BulkActionStatus.DELETING:
runs_in_backfill = instance.get_run_ids(
filters=RunsFilter.for_backfill(backfill.backfill_id)
)
for run_id in runs_in_backfill:
instance.delete_run(run_id)
yield None
instance.delete_backfill(backfill.backfill_id)
continue
if backfill.is_asset_backfill:
yield from execute_asset_backfill_iteration(
backfill, backfill_logger, workspace_process_context, instance
Expand Down

0 comments on commit 525a63d

Please sign in to comment.