diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java index d0adf3adbc..378fb4a45d 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java @@ -98,8 +98,7 @@ public ActivityTask poll() { try { permit = - slotSupplier.reserveSlot( - new SlotReservationData(pollRequest.getTaskQueue().getName(), false)); + slotSupplier.reserveSlot(new SlotReservationData(pollRequest.getTaskQueue().getName())); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java index 6beee60567..b9213d9684 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java @@ -424,7 +424,7 @@ public Optional tryReserveActivitySlot( return Optional.empty(); } return ActivityWorker.this.slotSupplier.tryReserveSlot( - new SlotReservationData(ActivityWorker.this.taskQueue, false)); + new SlotReservationData(ActivityWorker.this.taskQueue)); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java index 29b166f765..7cb541a923 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java @@ -276,7 +276,7 @@ private boolean submitANewExecution( } try { SlotPermit permit = null; - SlotReservationData reservationCtx = new SlotReservationData(taskQueue, false); + SlotReservationData reservationCtx = new SlotReservationData(taskQueue); if (acceptanceTimeoutMs <= 0) { permit = slotSupplier.reserveSlot(reservationCtx); } else { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SlotReservationData.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SlotReservationData.java index 026b15a02a..ccfe4424ab 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SlotReservationData.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SlotReservationData.java @@ -22,18 +22,12 @@ public class SlotReservationData { private final String taskQueue; - private final boolean sticky; - public SlotReservationData(String taskQueue, boolean sticky) { + public SlotReservationData(String taskQueue) { this.taskQueue = taskQueue; - this.sticky = sticky; } public String getTaskQueue() { return taskQueue; } - - public boolean isSticky() { - return sticky; - } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/StickyQueueBalancer.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/StickyQueueBalancer.java index 93fb7c698c..087a7b6144 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/StickyQueueBalancer.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/StickyQueueBalancer.java @@ -43,9 +43,10 @@ public StickyQueueBalancer(int pollersCount, boolean stickyQueueEnabled) { /** * @return task queue kind that should be used for the next poll */ - public TaskQueueKind nextPollKind() { + public TaskQueueKind makePoll() { if (stickyQueueEnabled) { if (disableNormalPoll.get()) { + stickyPollers.incrementAndGet(); return TaskQueueKind.TASK_QUEUE_KIND_STICKY; } // If pollersCount >= stickyBacklogSize > 0 we want to go back to a normal ratio to avoid a @@ -53,24 +54,14 @@ public TaskQueueKind nextPollKind() { // polls observing a stickyBacklogSize == 1 for example (which actually can be 0 already at // that moment) and get stuck causing dip in worker load. if (stickyBacklogSize > pollersCount || stickyPollers.get() <= normalPollers.get()) { + stickyPollers.incrementAndGet(); return TaskQueueKind.TASK_QUEUE_KIND_STICKY; } } + normalPollers.incrementAndGet(); return TaskQueueKind.TASK_QUEUE_KIND_NORMAL; } - /** Adjusts internal counters to reflect that a poll of the given kind has started */ - public void startPoll(TaskQueueKind kind) { - switch (kind) { - case TASK_QUEUE_KIND_NORMAL: - normalPollers.incrementAndGet(); - break; - case TASK_QUEUE_KIND_STICKY: - stickyPollers.incrementAndGet(); - break; - } - } - /** * @param taskQueueKind what kind of task queue poll was just finished */ @@ -102,4 +93,8 @@ public void finishPoll(TaskQueueKind taskQueueKind, long backlogSize) { public void disableNormalPoll() { disableNormalPoll.set(true); } + + public int getNormalPollerCount() { + return normalPollers.get(); + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/TrackingSlotSupplier.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/TrackingSlotSupplier.java index 722634e9ed..37ca46ff0e 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/TrackingSlotSupplier.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/TrackingSlotSupplier.java @@ -103,18 +103,15 @@ private void publishSlotsMetric() { } private SlotReservationContext createCtx(SlotReservationData dat) { - return new SlotResCtx( - dat.getTaskQueue(), dat.isSticky(), Collections.unmodifiableMap(usedSlots)); + return new SlotResCtx(dat.getTaskQueue(), Collections.unmodifiableMap(usedSlots)); } private class SlotResCtx implements SlotReservationContext { private final String taskQueue; - private final boolean sticky; private final Map usedSlots; - private SlotResCtx(String taskQueue, boolean sticky, Map usedSlots) { + private SlotResCtx(String taskQueue, Map usedSlots) { this.taskQueue = taskQueue; - this.sticky = sticky; this.usedSlots = usedSlots; } @@ -123,11 +120,6 @@ public String getTaskQueue() { return taskQueue; } - @Override - public boolean isSticky() { - return sticky; - } - @Override public Map usedSlots() { return usedSlots; diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java index a4f395002a..74028c4611 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java @@ -121,13 +121,9 @@ public WorkflowPollTask( public WorkflowTask poll() { boolean isSuccessful = false; SlotPermit permit; - TaskQueueKind taskQueueKind = stickyQueueBalancer.nextPollKind(); try { permit = - slotSupplier.reserveSlot( - new SlotReservationData( - pollRequest.getTaskQueue().getName(), - taskQueueKind == TaskQueueKind.TASK_QUEUE_KIND_STICKY)); + slotSupplier.reserveSlot(new SlotReservationData(pollRequest.getTaskQueue().getName())); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; @@ -136,12 +132,12 @@ public WorkflowTask poll() { return null; } - stickyQueueBalancer.startPoll(taskQueueKind); + TaskQueueKind taskQueueKind = stickyQueueBalancer.makePoll(); boolean isSticky = TaskQueueKind.TASK_QUEUE_KIND_STICKY.equals(taskQueueKind); PollWorkflowTaskQueueRequest request = isSticky ? stickyPollRequest : pollRequest; Scope scope = isSticky ? stickyMetricsScope : metricsScope; - log.trace("poll request begin: {}", request); + log.info("poll request begin: {}", request); try { PollWorkflowTaskQueueResponse response = doPoll(request, scope); if (response == null) { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java index 53ad93d951..9bf86af79d 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java @@ -246,7 +246,7 @@ public WorkflowTaskDispatchHandle reserveWorkflowExecutor() { } return slotSupplier // Eager workflow tasks are always sticky by definition - .tryReserveSlot(new SlotReservationData(taskQueue, true)) + .tryReserveSlot(new SlotReservationData(taskQueue)) .map( slotPermit -> new WorkflowTaskDispatchHandle( diff --git a/temporal-sdk/src/main/java/io/temporal/worker/tuning/SlotReservationContext.java b/temporal-sdk/src/main/java/io/temporal/worker/tuning/SlotReservationContext.java index e8c4b567f7..41f653e78e 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/tuning/SlotReservationContext.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/tuning/SlotReservationContext.java @@ -28,11 +28,6 @@ public interface SlotReservationContext { */ String getTaskQueue(); - /** - * @return true if the reservation request is for polling on a sticky workflow task queue. - */ - boolean isSticky(); - /** * @return A mapping of slot permits to the information associated with the in-use slot. */ diff --git a/temporal-sdk/src/main/java/io/temporal/worker/tuning/WorkflowSlotInfo.java b/temporal-sdk/src/main/java/io/temporal/worker/tuning/WorkflowSlotInfo.java index 3b02a718f3..8d42fdfe9f 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/tuning/WorkflowSlotInfo.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/tuning/WorkflowSlotInfo.java @@ -20,6 +20,7 @@ package io.temporal.worker.tuning; +import io.temporal.api.enums.v1.TaskQueueKind; import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueRequest; import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse; import java.util.Objects; @@ -33,32 +34,18 @@ public class WorkflowSlotInfo { private final String runId; private final String workerIdentity; private final String workerBuildId; - - public WorkflowSlotInfo( - String workflowType, - String taskQueue, - String workflowId, - String runId, - String workerIdentity, - String workerBuildId) { - this.workflowType = workflowType; - this.taskQueue = taskQueue; - this.workflowId = workflowId; - this.runId = runId; - this.workerIdentity = workerIdentity; - this.workerBuildId = workerBuildId; - } + private final boolean fromStickyQueue; public WorkflowSlotInfo( @Nonnull PollWorkflowTaskQueueResponse response, @Nonnull PollWorkflowTaskQueueRequest request) { - this( - response.getWorkflowType().getName(), - request.getTaskQueue().getNormalName(), - response.getWorkflowExecution().getWorkflowId(), - response.getWorkflowExecution().getRunId(), - request.getIdentity(), - request.getWorkerVersionCapabilities().getBuildId()); + this.workflowType = response.getWorkflowType().getName(); + this.taskQueue = request.getTaskQueue().getNormalName(); + this.workflowId = response.getWorkflowExecution().getWorkflowId(); + this.runId = response.getWorkflowExecution().getRunId(); + this.workerIdentity = request.getIdentity(); + this.workerBuildId = request.getWorkerVersionCapabilities().getBuildId(); + this.fromStickyQueue = request.getTaskQueue().getKind() == TaskQueueKind.TASK_QUEUE_KIND_STICKY; } public String getWorkflowType() { @@ -85,12 +72,17 @@ public String getWorkerBuildId() { return workerBuildId; } + public boolean isFromStickyQueue() { + return fromStickyQueue; + } + @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; WorkflowSlotInfo that = (WorkflowSlotInfo) o; - return Objects.equals(workflowType, that.workflowType) + return fromStickyQueue == that.fromStickyQueue + && Objects.equals(workflowType, that.workflowType) && Objects.equals(taskQueue, that.taskQueue) && Objects.equals(workflowId, that.workflowId) && Objects.equals(runId, that.runId) @@ -100,7 +92,8 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(workflowType, taskQueue, workflowId, runId, workerIdentity, workerBuildId); + return Objects.hash( + workflowType, taskQueue, workflowId, runId, workerIdentity, workerBuildId, fromStickyQueue); } @Override @@ -124,6 +117,8 @@ public String toString() { + ", workerBuildId='" + workerBuildId + '\'' + + ", fromStickyQueue=" + + fromStickyQueue + '}'; } } diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java index dcf6989b29..0df7e783dc 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java @@ -132,10 +132,8 @@ public void stickyQueueBacklogResetTest() { } else { assertNull(poller.poll()); } - TaskQueueKind nextKind = stickyQueueBalancer.nextPollKind(); - assertEquals(TaskQueueKind.TASK_QUEUE_KIND_STICKY, nextKind); - stickyQueueBalancer.startPoll(nextKind); + assertEquals(TaskQueueKind.TASK_QUEUE_KIND_STICKY, stickyQueueBalancer.makePoll()); // If the backlog was not reset this would be a sticky task - assertEquals(TaskQueueKind.TASK_QUEUE_KIND_NORMAL, stickyQueueBalancer.nextPollKind()); + assertEquals(TaskQueueKind.TASK_QUEUE_KIND_NORMAL, stickyQueueBalancer.makePoll()); } }