Skip to content

Commit

Permalink
simplify on_crash hook example (#15079)
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz authored Aug 26, 2024
1 parent 4c3e2c3 commit 66fe6e4
Showing 1 changed file with 9 additions and 21 deletions.
30 changes: 9 additions & 21 deletions docs/3.0rc/develop/state-hooks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -165,36 +165,24 @@ Here's how to create a hook that deletes a Cloud Run job if the flow run crashes
```python
import os
from prefect import flow, task
from prefect.variables import Variable
from prefect.client.orchestration import get_client
import prefect.runtime

async def delete_cloud_run_job(flow, flow_run, state):
"""Flow run state change hook that deletes a Cloud Run Job if
the flow run crashes."""
def delete_cloud_run_job(flow, flow_run, state):
"""hook that deletes the Cloud Run Job associated with the flow run."""

# retrieve Cloud Run job name
cloud_run_job_name = await Variable.get(
name="crashing-flow_cloud_run_job"
)

# delete Cloud Run job
delete_job_command = f"yes | gcloud beta run jobs delete
{cloud_run_job_name.value} --region us-central1"
os.system(delete_job_command)
cloud_run_job_name = flow_run.name

delete_cloud_run_job_command = (
"yes | gcloud beta run jobs delete"
f"{cloud_run_job_name} --region us-central1"
)
os.system(delete_cloud_run_job_command)

@task
def my_task_that_crashes():
raise SystemExit("Crashing on purpose!")
raise SystemExit("Crashing out! 💥")

@flow(on_crashed=[delete_cloud_run_job])
def crashing_flow():
"""Save the flow run name (i.e. Cloud Run job name) as a
Variable. It then executes a task that ends up crashing."""
flow_run_name = prefect.runtime.flow_run.name
cloud_run_job_name = Variable.set(name="crashing_flow_cloud_run_job", value=flow_run_name, overwrite=True)

my_task_that_crashes()

if __name__ == "__main__":
Expand Down

0 comments on commit 66fe6e4

Please sign in to comment.