Skip to content

Commit

Permalink
Do not lock in sticky choice ahead of reservation
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Apr 5, 2024
1 parent be21812 commit 767552a
Show file tree
Hide file tree
Showing 11 changed files with 39 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ public ActivityTask poll() {

try {
permit =
slotSupplier.reserveSlot(
new SlotReservationData(pollRequest.getTaskQueue().getName(), false));
slotSupplier.reserveSlot(new SlotReservationData(pollRequest.getTaskQueue().getName()));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ public Optional<SlotPermit> tryReserveActivitySlot(
return Optional.empty();
}
return ActivityWorker.this.slotSupplier.tryReserveSlot(
new SlotReservationData(ActivityWorker.this.taskQueue, false));
new SlotReservationData(ActivityWorker.this.taskQueue));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ private boolean submitANewExecution(
}
try {
SlotPermit permit = null;
SlotReservationData reservationCtx = new SlotReservationData(taskQueue, false);
SlotReservationData reservationCtx = new SlotReservationData(taskQueue);
if (acceptanceTimeoutMs <= 0) {
permit = slotSupplier.reserveSlot(reservationCtx);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,12 @@

public class SlotReservationData {
private final String taskQueue;
private final boolean sticky;

public SlotReservationData(String taskQueue, boolean sticky) {
public SlotReservationData(String taskQueue) {
this.taskQueue = taskQueue;
this.sticky = sticky;
}

public String getTaskQueue() {
return taskQueue;
}

public boolean isSticky() {
return sticky;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,34 +43,25 @@ public StickyQueueBalancer(int pollersCount, boolean stickyQueueEnabled) {
/**
* @return task queue kind that should be used for the next poll
*/
public TaskQueueKind nextPollKind() {
public TaskQueueKind makePoll() {
if (stickyQueueEnabled) {
if (disableNormalPoll.get()) {
stickyPollers.incrementAndGet();
return TaskQueueKind.TASK_QUEUE_KIND_STICKY;
}
// If pollersCount >= stickyBacklogSize > 0 we want to go back to a normal ratio to avoid a
// situation that too many pollers (all of them in the worst case) will open only sticky queue
// polls observing a stickyBacklogSize == 1 for example (which actually can be 0 already at
// that moment) and get stuck causing dip in worker load.
if (stickyBacklogSize > pollersCount || stickyPollers.get() <= normalPollers.get()) {
stickyPollers.incrementAndGet();
return TaskQueueKind.TASK_QUEUE_KIND_STICKY;
}
}
normalPollers.incrementAndGet();
return TaskQueueKind.TASK_QUEUE_KIND_NORMAL;
}

/** Adjusts internal counters to reflect that a poll of the given kind has started */
public void startPoll(TaskQueueKind kind) {
switch (kind) {
case TASK_QUEUE_KIND_NORMAL:
normalPollers.incrementAndGet();
break;
case TASK_QUEUE_KIND_STICKY:
stickyPollers.incrementAndGet();
break;
}
}

/**
* @param taskQueueKind what kind of task queue poll was just finished
*/
Expand Down Expand Up @@ -102,4 +93,8 @@ public void finishPoll(TaskQueueKind taskQueueKind, long backlogSize) {
public void disableNormalPoll() {
disableNormalPoll.set(true);
}

public int getNormalPollerCount() {
return normalPollers.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,15 @@ private void publishSlotsMetric() {
}

private SlotReservationContext<SI> createCtx(SlotReservationData dat) {
return new SlotResCtx(
dat.getTaskQueue(), dat.isSticky(), Collections.unmodifiableMap(usedSlots));
return new SlotResCtx(dat.getTaskQueue(), Collections.unmodifiableMap(usedSlots));
}

private class SlotResCtx implements SlotReservationContext<SI> {
private final String taskQueue;
private final boolean sticky;
private final Map<SlotPermit, SI> usedSlots;

private SlotResCtx(String taskQueue, boolean sticky, Map<SlotPermit, SI> usedSlots) {
private SlotResCtx(String taskQueue, Map<SlotPermit, SI> usedSlots) {
this.taskQueue = taskQueue;
this.sticky = sticky;
this.usedSlots = usedSlots;
}

Expand All @@ -123,11 +120,6 @@ public String getTaskQueue() {
return taskQueue;
}

@Override
public boolean isSticky() {
return sticky;
}

@Override
public Map<SlotPermit, SI> usedSlots() {
return usedSlots;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,9 @@ public WorkflowPollTask(
public WorkflowTask poll() {
boolean isSuccessful = false;
SlotPermit permit;
TaskQueueKind taskQueueKind = stickyQueueBalancer.nextPollKind();
try {
permit =
slotSupplier.reserveSlot(
new SlotReservationData(
pollRequest.getTaskQueue().getName(),
taskQueueKind == TaskQueueKind.TASK_QUEUE_KIND_STICKY));
slotSupplier.reserveSlot(new SlotReservationData(pollRequest.getTaskQueue().getName()));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
Expand All @@ -136,12 +132,12 @@ public WorkflowTask poll() {
return null;
}

stickyQueueBalancer.startPoll(taskQueueKind);
TaskQueueKind taskQueueKind = stickyQueueBalancer.makePoll();
boolean isSticky = TaskQueueKind.TASK_QUEUE_KIND_STICKY.equals(taskQueueKind);
PollWorkflowTaskQueueRequest request = isSticky ? stickyPollRequest : pollRequest;
Scope scope = isSticky ? stickyMetricsScope : metricsScope;

log.trace("poll request begin: {}", request);
log.info("poll request begin: {}", request);
try {
PollWorkflowTaskQueueResponse response = doPoll(request, scope);
if (response == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public WorkflowTaskDispatchHandle reserveWorkflowExecutor() {
}
return slotSupplier
// Eager workflow tasks are always sticky by definition
.tryReserveSlot(new SlotReservationData(taskQueue, true))
.tryReserveSlot(new SlotReservationData(taskQueue))
.map(
slotPermit ->
new WorkflowTaskDispatchHandle(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ public interface SlotReservationContext<SI> {
*/
String getTaskQueue();

/**
* @return true if the reservation request is for polling on a sticky workflow task queue.
*/
boolean isSticky();

/**
* @return A mapping of slot permits to the information associated with the in-use slot.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package io.temporal.worker.tuning;

import io.temporal.api.enums.v1.TaskQueueKind;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueRequest;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
import java.util.Objects;
Expand All @@ -33,32 +34,18 @@ public class WorkflowSlotInfo {
private final String runId;
private final String workerIdentity;
private final String workerBuildId;

public WorkflowSlotInfo(
String workflowType,
String taskQueue,
String workflowId,
String runId,
String workerIdentity,
String workerBuildId) {
this.workflowType = workflowType;
this.taskQueue = taskQueue;
this.workflowId = workflowId;
this.runId = runId;
this.workerIdentity = workerIdentity;
this.workerBuildId = workerBuildId;
}
private final boolean fromStickyQueue;

public WorkflowSlotInfo(
@Nonnull PollWorkflowTaskQueueResponse response,
@Nonnull PollWorkflowTaskQueueRequest request) {
this(
response.getWorkflowType().getName(),
request.getTaskQueue().getNormalName(),
response.getWorkflowExecution().getWorkflowId(),
response.getWorkflowExecution().getRunId(),
request.getIdentity(),
request.getWorkerVersionCapabilities().getBuildId());
this.workflowType = response.getWorkflowType().getName();
this.taskQueue = request.getTaskQueue().getNormalName();
this.workflowId = response.getWorkflowExecution().getWorkflowId();
this.runId = response.getWorkflowExecution().getRunId();
this.workerIdentity = request.getIdentity();
this.workerBuildId = request.getWorkerVersionCapabilities().getBuildId();
this.fromStickyQueue = request.getTaskQueue().getKind() == TaskQueueKind.TASK_QUEUE_KIND_STICKY;
}

public String getWorkflowType() {
Expand All @@ -85,12 +72,17 @@ public String getWorkerBuildId() {
return workerBuildId;
}

public boolean isFromStickyQueue() {
return fromStickyQueue;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
WorkflowSlotInfo that = (WorkflowSlotInfo) o;
return Objects.equals(workflowType, that.workflowType)
return fromStickyQueue == that.fromStickyQueue
&& Objects.equals(workflowType, that.workflowType)
&& Objects.equals(taskQueue, that.taskQueue)
&& Objects.equals(workflowId, that.workflowId)
&& Objects.equals(runId, that.runId)
Expand All @@ -100,7 +92,8 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(workflowType, taskQueue, workflowId, runId, workerIdentity, workerBuildId);
return Objects.hash(
workflowType, taskQueue, workflowId, runId, workerIdentity, workerBuildId, fromStickyQueue);
}

@Override
Expand All @@ -124,6 +117,8 @@ public String toString() {
+ ", workerBuildId='"
+ workerBuildId
+ '\''
+ ", fromStickyQueue="
+ fromStickyQueue
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,8 @@ public void stickyQueueBacklogResetTest() {
} else {
assertNull(poller.poll());
}
TaskQueueKind nextKind = stickyQueueBalancer.nextPollKind();
assertEquals(TaskQueueKind.TASK_QUEUE_KIND_STICKY, nextKind);
stickyQueueBalancer.startPoll(nextKind);
assertEquals(TaskQueueKind.TASK_QUEUE_KIND_STICKY, stickyQueueBalancer.makePoll());
// If the backlog was not reset this would be a sticky task
assertEquals(TaskQueueKind.TASK_QUEUE_KIND_NORMAL, stickyQueueBalancer.nextPollKind());
assertEquals(TaskQueueKind.TASK_QUEUE_KIND_NORMAL, stickyQueueBalancer.makePoll());
}
}

0 comments on commit 767552a

Please sign in to comment.