Skip to content

Commit

Permalink
Switch getInitialCommandEventId from sentinel values to OptionalLong.
Browse files Browse the repository at this point in the history
  • Loading branch information
chronos-tachyon committed Mar 2, 2024
1 parent 5bb3494 commit 4327cca
Showing 1 changed file with 70 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ enum HandleEventStatus {

/** Initial set of SDK flags that will be set on all new workflow executions. */
private static final List<SdkFlag> initialFlags =
Collections.unmodifiableList(Arrays.asList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION));
Collections.unmodifiableList(
Collections.singletonList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION));

/**
* EventId of the WorkflowTaskStarted event of the Workflow Task that was picked up by a worker
Expand Down Expand Up @@ -157,7 +158,7 @@ enum HandleEventStatus {

private final WFTBuffer wftBuffer = new WFTBuffer();

private List<Message> messages = new ArrayList<Message>();
private List<Message> messages = new ArrayList<>();

private final SdkFlags flags;

Expand Down Expand Up @@ -356,8 +357,8 @@ private void handleSingleEventLookahead(HistoryEvent event) {
}

private List<Message> takeLTE(long eventId) {
List<Message> m = new ArrayList<Message>();
List<Message> remainingMessages = new ArrayList<Message>();
List<Message> m = new ArrayList<>();
List<Message> remainingMessages = new ArrayList<>();
for (Message msg : this.messages) {
if (msg.getEventId() > eventId) {
remainingMessages.add(msg);
Expand Down Expand Up @@ -428,16 +429,16 @@ private void handleSingleEvent(HistoryEvent event, boolean hasNextEvent) {
replaying = false;
}

final long initialCommandEventId = getInitialCommandEventId(event);
if (initialCommandEventId < 0L) {
final OptionalLong initialCommandEventId = getInitialCommandEventId(event);
if (!initialCommandEventId.isPresent()) {
return;
}

EntityStateMachine c = stateMachines.get(initialCommandEventId);
EntityStateMachine c = stateMachines.get(initialCommandEventId.getAsLong());
if (c != null) {
c.handleEvent(event, hasNextEvent);
if (c.isFinalState()) {
stateMachines.remove(initialCommandEventId);
stateMachines.remove(initialCommandEventId.getAsLong());
}
} else {
handleNonStatefulEvent(event, hasNextEvent);
Expand Down Expand Up @@ -587,9 +588,7 @@ public void sendMessage(Message message) {

public List<Message> takeMessages() {
List<Message> result = new ArrayList<>(messageOutbox.size());
for (Message message : messageOutbox) {
result.add(message);
}
result.addAll(messageOutbox);
messageOutbox.clear();
return result;
}
Expand Down Expand Up @@ -962,10 +961,9 @@ public boolean getVersion(
VersionStateMachine stateMachine =
versions.computeIfAbsent(
changeId,
(idKey) -> {
return VersionStateMachine.newInstance(
changeId, this::isReplaying, commandSink, stateMachineSink);
});
(idKey) ->
VersionStateMachine.newInstance(
changeId, this::isReplaying, commandSink, stateMachineSink));
return stateMachine.getVersion(
minSupported,
maxSupported,
Expand Down Expand Up @@ -1196,60 +1194,85 @@ public void updateRunId(String currentRunId) {
}
}

private long getInitialCommandEventId(HistoryEvent event) {
/**
* Extracts the eventId of the "initial command" for the given event.
*
* <p>The "initial command" is the event which started a group of related events:
* ActivityTaskScheduled, TimerStarted, and so on; for events which are not part of a group, the
* event's own eventId is returned. If the event has an unknown type but is marked as ignorable,
* then {@link OptionalLong#empty()} is returned instead.
*
* @return the eventId of the initial command, or {@link OptionalLong#empty()}
*/
private OptionalLong getInitialCommandEventId(HistoryEvent event) {
switch (event.getEventType()) {
case EVENT_TYPE_ACTIVITY_TASK_STARTED:
return event.getActivityTaskStartedEventAttributes().getScheduledEventId();
return OptionalLong.of(event.getActivityTaskStartedEventAttributes().getScheduledEventId());
case EVENT_TYPE_ACTIVITY_TASK_COMPLETED:
return event.getActivityTaskCompletedEventAttributes().getScheduledEventId();
return OptionalLong.of(
event.getActivityTaskCompletedEventAttributes().getScheduledEventId());
case EVENT_TYPE_ACTIVITY_TASK_FAILED:
return event.getActivityTaskFailedEventAttributes().getScheduledEventId();
return OptionalLong.of(event.getActivityTaskFailedEventAttributes().getScheduledEventId());
case EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT:
return event.getActivityTaskTimedOutEventAttributes().getScheduledEventId();
return OptionalLong.of(
event.getActivityTaskTimedOutEventAttributes().getScheduledEventId());
case EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED:
return event.getActivityTaskCancelRequestedEventAttributes().getScheduledEventId();
return OptionalLong.of(
event.getActivityTaskCancelRequestedEventAttributes().getScheduledEventId());
case EVENT_TYPE_ACTIVITY_TASK_CANCELED:
return event.getActivityTaskCanceledEventAttributes().getScheduledEventId();
return OptionalLong.of(
event.getActivityTaskCanceledEventAttributes().getScheduledEventId());
case EVENT_TYPE_TIMER_FIRED:
return event.getTimerFiredEventAttributes().getStartedEventId();
return OptionalLong.of(event.getTimerFiredEventAttributes().getStartedEventId());
case EVENT_TYPE_TIMER_CANCELED:
return event.getTimerCanceledEventAttributes().getStartedEventId();
return OptionalLong.of(event.getTimerCanceledEventAttributes().getStartedEventId());
case EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED:
return event
.getRequestCancelExternalWorkflowExecutionFailedEventAttributes()
.getInitiatedEventId();
return OptionalLong.of(
event
.getRequestCancelExternalWorkflowExecutionFailedEventAttributes()
.getInitiatedEventId());
case EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
return event
.getExternalWorkflowExecutionCancelRequestedEventAttributes()
.getInitiatedEventId();
return OptionalLong.of(
event
.getExternalWorkflowExecutionCancelRequestedEventAttributes()
.getInitiatedEventId());
case EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED:
return event.getStartChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId();
return OptionalLong.of(
event.getStartChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId());
case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED:
return event.getChildWorkflowExecutionStartedEventAttributes().getInitiatedEventId();
return OptionalLong.of(
event.getChildWorkflowExecutionStartedEventAttributes().getInitiatedEventId());
case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED:
return event.getChildWorkflowExecutionCompletedEventAttributes().getInitiatedEventId();
return OptionalLong.of(
event.getChildWorkflowExecutionCompletedEventAttributes().getInitiatedEventId());
case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_FAILED:
return event.getChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId();
return OptionalLong.of(
event.getChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId());
case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED:
return event.getChildWorkflowExecutionCanceledEventAttributes().getInitiatedEventId();
return OptionalLong.of(
event.getChildWorkflowExecutionCanceledEventAttributes().getInitiatedEventId());
case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TIMED_OUT:
return event.getChildWorkflowExecutionTimedOutEventAttributes().getInitiatedEventId();
return OptionalLong.of(
event.getChildWorkflowExecutionTimedOutEventAttributes().getInitiatedEventId());
case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TERMINATED:
return event.getChildWorkflowExecutionTerminatedEventAttributes().getInitiatedEventId();
return OptionalLong.of(
event.getChildWorkflowExecutionTerminatedEventAttributes().getInitiatedEventId());
case EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED:
return event
.getSignalExternalWorkflowExecutionFailedEventAttributes()
.getInitiatedEventId();
return OptionalLong.of(
event.getSignalExternalWorkflowExecutionFailedEventAttributes().getInitiatedEventId());
case EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_SIGNALED:
return event.getExternalWorkflowExecutionSignaledEventAttributes().getInitiatedEventId();
return OptionalLong.of(
event.getExternalWorkflowExecutionSignaledEventAttributes().getInitiatedEventId());
case EVENT_TYPE_WORKFLOW_TASK_STARTED:
return event.getWorkflowTaskStartedEventAttributes().getScheduledEventId();
return OptionalLong.of(event.getWorkflowTaskStartedEventAttributes().getScheduledEventId());
case EVENT_TYPE_WORKFLOW_TASK_COMPLETED:
return event.getWorkflowTaskCompletedEventAttributes().getScheduledEventId();
return OptionalLong.of(
event.getWorkflowTaskCompletedEventAttributes().getScheduledEventId());
case EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT:
return event.getWorkflowTaskTimedOutEventAttributes().getScheduledEventId();
return OptionalLong.of(
event.getWorkflowTaskTimedOutEventAttributes().getScheduledEventId());
case EVENT_TYPE_WORKFLOW_TASK_FAILED:
return event.getWorkflowTaskFailedEventAttributes().getScheduledEventId();
return OptionalLong.of(event.getWorkflowTaskFailedEventAttributes().getScheduledEventId());

case EVENT_TYPE_ACTIVITY_TASK_SCHEDULED:
case EVENT_TYPE_TIMER_STARTED:
Expand All @@ -1268,11 +1291,11 @@ private long getInitialCommandEventId(HistoryEvent event) {
case EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT:
case EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
case EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED:
return event.getEventId();
return OptionalLong.of(event.getEventId());

default:
if (event.getWorkerMayIgnore()) {
return -1L;
return OptionalLong.empty();
}
throw new IllegalArgumentException("Unexpected event type: " + event.getEventType());
}
Expand Down

0 comments on commit 4327cca

Please sign in to comment.