From 4327cca379f702d262cb0006d86d5df1afc03788 Mon Sep 17 00:00:00 2001 From: Donald King Date: Fri, 1 Mar 2024 18:25:04 -0800 Subject: [PATCH] Switch getInitialCommandEventId from sentinel values to OptionalLong. --- .../statemachines/WorkflowStateMachines.java | 117 +++++++++++------- 1 file changed, 70 insertions(+), 47 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java index 091cb29eb..1d32b9bb5 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java @@ -58,7 +58,8 @@ enum HandleEventStatus { /** Initial set of SDK flags that will be set on all new workflow executions. */ private static final List initialFlags = - Collections.unmodifiableList(Arrays.asList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION)); + Collections.unmodifiableList( + Collections.singletonList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION)); /** * EventId of the WorkflowTaskStarted event of the Workflow Task that was picked up by a worker @@ -157,7 +158,7 @@ enum HandleEventStatus { private final WFTBuffer wftBuffer = new WFTBuffer(); - private List messages = new ArrayList(); + private List messages = new ArrayList<>(); private final SdkFlags flags; @@ -356,8 +357,8 @@ private void handleSingleEventLookahead(HistoryEvent event) { } private List takeLTE(long eventId) { - List m = new ArrayList(); - List remainingMessages = new ArrayList(); + List m = new ArrayList<>(); + List remainingMessages = new ArrayList<>(); for (Message msg : this.messages) { if (msg.getEventId() > eventId) { remainingMessages.add(msg); @@ -428,16 +429,16 @@ private void handleSingleEvent(HistoryEvent event, boolean hasNextEvent) { replaying = false; } - final long initialCommandEventId = getInitialCommandEventId(event); - if (initialCommandEventId < 0L) { + final OptionalLong initialCommandEventId = getInitialCommandEventId(event); + if (!initialCommandEventId.isPresent()) { return; } - EntityStateMachine c = stateMachines.get(initialCommandEventId); + EntityStateMachine c = stateMachines.get(initialCommandEventId.getAsLong()); if (c != null) { c.handleEvent(event, hasNextEvent); if (c.isFinalState()) { - stateMachines.remove(initialCommandEventId); + stateMachines.remove(initialCommandEventId.getAsLong()); } } else { handleNonStatefulEvent(event, hasNextEvent); @@ -587,9 +588,7 @@ public void sendMessage(Message message) { public List takeMessages() { List result = new ArrayList<>(messageOutbox.size()); - for (Message message : messageOutbox) { - result.add(message); - } + result.addAll(messageOutbox); messageOutbox.clear(); return result; } @@ -962,10 +961,9 @@ public boolean getVersion( VersionStateMachine stateMachine = versions.computeIfAbsent( changeId, - (idKey) -> { - return VersionStateMachine.newInstance( - changeId, this::isReplaying, commandSink, stateMachineSink); - }); + (idKey) -> + VersionStateMachine.newInstance( + changeId, this::isReplaying, commandSink, stateMachineSink)); return stateMachine.getVersion( minSupported, maxSupported, @@ -1196,60 +1194,85 @@ public void updateRunId(String currentRunId) { } } - private long getInitialCommandEventId(HistoryEvent event) { + /** + * Extracts the eventId of the "initial command" for the given event. + * + *

The "initial command" is the event which started a group of related events: + * ActivityTaskScheduled, TimerStarted, and so on; for events which are not part of a group, the + * event's own eventId is returned. If the event has an unknown type but is marked as ignorable, + * then {@link OptionalLong#empty()} is returned instead. + * + * @return the eventId of the initial command, or {@link OptionalLong#empty()} + */ + private OptionalLong getInitialCommandEventId(HistoryEvent event) { switch (event.getEventType()) { case EVENT_TYPE_ACTIVITY_TASK_STARTED: - return event.getActivityTaskStartedEventAttributes().getScheduledEventId(); + return OptionalLong.of(event.getActivityTaskStartedEventAttributes().getScheduledEventId()); case EVENT_TYPE_ACTIVITY_TASK_COMPLETED: - return event.getActivityTaskCompletedEventAttributes().getScheduledEventId(); + return OptionalLong.of( + event.getActivityTaskCompletedEventAttributes().getScheduledEventId()); case EVENT_TYPE_ACTIVITY_TASK_FAILED: - return event.getActivityTaskFailedEventAttributes().getScheduledEventId(); + return OptionalLong.of(event.getActivityTaskFailedEventAttributes().getScheduledEventId()); case EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT: - return event.getActivityTaskTimedOutEventAttributes().getScheduledEventId(); + return OptionalLong.of( + event.getActivityTaskTimedOutEventAttributes().getScheduledEventId()); case EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED: - return event.getActivityTaskCancelRequestedEventAttributes().getScheduledEventId(); + return OptionalLong.of( + event.getActivityTaskCancelRequestedEventAttributes().getScheduledEventId()); case EVENT_TYPE_ACTIVITY_TASK_CANCELED: - return event.getActivityTaskCanceledEventAttributes().getScheduledEventId(); + return OptionalLong.of( + event.getActivityTaskCanceledEventAttributes().getScheduledEventId()); case EVENT_TYPE_TIMER_FIRED: - return event.getTimerFiredEventAttributes().getStartedEventId(); + return OptionalLong.of(event.getTimerFiredEventAttributes().getStartedEventId()); case EVENT_TYPE_TIMER_CANCELED: - return event.getTimerCanceledEventAttributes().getStartedEventId(); + return OptionalLong.of(event.getTimerCanceledEventAttributes().getStartedEventId()); case EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED: - return event - .getRequestCancelExternalWorkflowExecutionFailedEventAttributes() - .getInitiatedEventId(); + return OptionalLong.of( + event + .getRequestCancelExternalWorkflowExecutionFailedEventAttributes() + .getInitiatedEventId()); case EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED: - return event - .getExternalWorkflowExecutionCancelRequestedEventAttributes() - .getInitiatedEventId(); + return OptionalLong.of( + event + .getExternalWorkflowExecutionCancelRequestedEventAttributes() + .getInitiatedEventId()); case EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED: - return event.getStartChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId(); + return OptionalLong.of( + event.getStartChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId()); case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED: - return event.getChildWorkflowExecutionStartedEventAttributes().getInitiatedEventId(); + return OptionalLong.of( + event.getChildWorkflowExecutionStartedEventAttributes().getInitiatedEventId()); case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED: - return event.getChildWorkflowExecutionCompletedEventAttributes().getInitiatedEventId(); + return OptionalLong.of( + event.getChildWorkflowExecutionCompletedEventAttributes().getInitiatedEventId()); case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_FAILED: - return event.getChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId(); + return OptionalLong.of( + event.getChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId()); case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED: - return event.getChildWorkflowExecutionCanceledEventAttributes().getInitiatedEventId(); + return OptionalLong.of( + event.getChildWorkflowExecutionCanceledEventAttributes().getInitiatedEventId()); case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TIMED_OUT: - return event.getChildWorkflowExecutionTimedOutEventAttributes().getInitiatedEventId(); + return OptionalLong.of( + event.getChildWorkflowExecutionTimedOutEventAttributes().getInitiatedEventId()); case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TERMINATED: - return event.getChildWorkflowExecutionTerminatedEventAttributes().getInitiatedEventId(); + return OptionalLong.of( + event.getChildWorkflowExecutionTerminatedEventAttributes().getInitiatedEventId()); case EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED: - return event - .getSignalExternalWorkflowExecutionFailedEventAttributes() - .getInitiatedEventId(); + return OptionalLong.of( + event.getSignalExternalWorkflowExecutionFailedEventAttributes().getInitiatedEventId()); case EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_SIGNALED: - return event.getExternalWorkflowExecutionSignaledEventAttributes().getInitiatedEventId(); + return OptionalLong.of( + event.getExternalWorkflowExecutionSignaledEventAttributes().getInitiatedEventId()); case EVENT_TYPE_WORKFLOW_TASK_STARTED: - return event.getWorkflowTaskStartedEventAttributes().getScheduledEventId(); + return OptionalLong.of(event.getWorkflowTaskStartedEventAttributes().getScheduledEventId()); case EVENT_TYPE_WORKFLOW_TASK_COMPLETED: - return event.getWorkflowTaskCompletedEventAttributes().getScheduledEventId(); + return OptionalLong.of( + event.getWorkflowTaskCompletedEventAttributes().getScheduledEventId()); case EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT: - return event.getWorkflowTaskTimedOutEventAttributes().getScheduledEventId(); + return OptionalLong.of( + event.getWorkflowTaskTimedOutEventAttributes().getScheduledEventId()); case EVENT_TYPE_WORKFLOW_TASK_FAILED: - return event.getWorkflowTaskFailedEventAttributes().getScheduledEventId(); + return OptionalLong.of(event.getWorkflowTaskFailedEventAttributes().getScheduledEventId()); case EVENT_TYPE_ACTIVITY_TASK_SCHEDULED: case EVENT_TYPE_TIMER_STARTED: @@ -1268,11 +1291,11 @@ private long getInitialCommandEventId(HistoryEvent event) { case EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT: case EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED: case EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED: - return event.getEventId(); + return OptionalLong.of(event.getEventId()); default: if (event.getWorkerMayIgnore()) { - return -1L; + return OptionalLong.empty(); } throw new IllegalArgumentException("Unexpected event type: " + event.getEventType()); }