Skip to content

Commit

Permalink
Rewrite / cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
dandavison committed Oct 4, 2024
1 parent 9192133 commit ae0a6e9
Show file tree
Hide file tree
Showing 9 changed files with 278 additions and 169 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,3 @@ class WorkflowExitType(IntEnum):
@dataclass
class WorkflowInput:
exit_type: WorkflowExitType


class OnWorkflowExitAction(IntEnum):
CONTINUE = 0
ABORT_WITH_COMPENSATION = 1


@dataclass
class UpdateInput:
on_premature_workflow_exit: OnWorkflowExitAction
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import asyncio

from temporalio import activity


@activity.defn
async def activity_executed_by_update_handler():
await asyncio.sleep(1)


@activity.defn
async def activity_executed_by_update_handler_to_perform_compensation():
await asyncio.sleep(1)
Original file line number Diff line number Diff line change
@@ -1,40 +1,37 @@
import asyncio

from temporalio import client
from temporalio import client, common

from message_passing.message_handler_waiting_compensation_cleanup import (
from message_passing.waiting_for_handlers_and_compensation import (
TASK_QUEUE,
WORKFLOW_ID,
OnWorkflowExitAction,
UpdateInput,
WorkflowExitType,
WorkflowInput,
)
from message_passing.message_handler_waiting_compensation_cleanup.workflows import (
MyWorkflow,
from message_passing.waiting_for_handlers_and_compensation.workflows import (
WaitingForHandlersAndCompensationWorkflow,
)


async def starter(exit_type: WorkflowExitType, update_action: OnWorkflowExitAction):
async def starter(exit_type: WorkflowExitType):
cl = await client.Client.connect("localhost:7233")
wf_handle = await cl.start_workflow(
MyWorkflow.run,
WaitingForHandlersAndCompensationWorkflow.run,
WorkflowInput(exit_type=exit_type),
id=WORKFLOW_ID,
task_queue=TASK_QUEUE,
id_conflict_policy=common.WorkflowIDConflictPolicy.TERMINATE_EXISTING,
)
await _check_run(wf_handle, exit_type, update_action)
await _check_run(wf_handle, exit_type)


async def _check_run(
wf_handle: client.WorkflowHandle,
exit_type: WorkflowExitType,
update_action: OnWorkflowExitAction,
):
try:
up_handle = await wf_handle.start_update(
MyWorkflow.my_update,
UpdateInput(on_premature_workflow_exit=update_action),
WaitingForHandlersAndCompensationWorkflow.my_update,
wait_for_stage=client.WorkflowUpdateStage.ACCEPTED,
)
except Exception as e:
Expand All @@ -54,7 +51,7 @@ async def _check_run(
)

if exit_type == WorkflowExitType.CONTINUE_AS_NEW:
await _check_run(wf_handle, WorkflowExitType.SUCCESS, update_action)
await _check_run(wf_handle, WorkflowExitType.SUCCESS)
else:
try:
await wf_handle.result()
Expand All @@ -66,14 +63,14 @@ async def _check_run(


async def main():
for exit_type in WorkflowExitType:
for exit_type in [
WorkflowExitType.SUCCESS,
WorkflowExitType.FAILURE,
WorkflowExitType.CANCELLATION,
WorkflowExitType.CONTINUE_AS_NEW,
]:
print(f"\n\nworkflow exit type: {exit_type.name}")
for update_action in [
OnWorkflowExitAction.CONTINUE,
OnWorkflowExitAction.ABORT_WITH_COMPENSATION,
]:
print(f" update action on premature workflow exit: {update_action}")
await starter(exit_type, update_action)
await starter(exit_type)


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
from temporalio.client import Client
from temporalio.worker import Worker

from message_passing.message_handler_waiting_compensation_cleanup import TASK_QUEUE
from message_passing.message_handler_waiting_compensation_cleanup.activities import (
my_activity,
from message_passing.waiting_for_handlers_and_compensation import TASK_QUEUE
from message_passing.waiting_for_handlers_and_compensation.activities import (
activity_executed_by_update_handler,
activity_executed_by_update_handler_to_perform_compensation,
)
from message_passing.message_handler_waiting_compensation_cleanup.workflows import (
MyWorkflow,
from message_passing.waiting_for_handlers_and_compensation.workflows import (
WaitingForHandlersAndCompensationWorkflow,
)

interrupt_event = asyncio.Event()
Expand All @@ -23,8 +24,11 @@ async def main():
async with Worker(
client,
task_queue=TASK_QUEUE,
workflows=[MyWorkflow],
activities=[my_activity],
workflows=[WaitingForHandlersAndCompensationWorkflow],
activities=[
activity_executed_by_update_handler,
activity_executed_by_update_handler_to_perform_compensation,
],
):
logging.info("Worker started, ctrl+c to exit")
await interrupt_event.wait()
Expand Down
Loading

0 comments on commit ae0a6e9

Please sign in to comment.