From 76a5921d70c7432a14cfdf9d5214ba26e0b21ef4 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Mon, 25 Mar 2024 16:29:06 -0700 Subject: [PATCH] Use completable future, fix sched-to-start timeout test --- .../internal/worker/ActivityPollTask.java | 9 +- .../worker/LocalActivityExecutionContext.java | 3 +- .../internal/worker/LocalActivityWorker.java | 84 +++++++++++-------- .../internal/worker/PollTaskExecutor.java | 3 +- .../internal/worker/WorkflowPollTask.java | 9 +- .../slotsupplier/FixedSizeSlotSupplier.java | 14 +++- .../worker/slotsupplier/SlotSupplier.java | 4 +- .../slotsupplier/TrackingSlotSupplier.java | 9 +- 8 files changed, 86 insertions(+), 49 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java index a0381e3f0..8cc0a2426 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java @@ -35,6 +35,7 @@ import io.temporal.worker.MetricsType; import io.temporal.worker.slotsupplier.*; import java.util.Objects; +import java.util.concurrent.ExecutionException; import java.util.function.Supplier; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -97,11 +98,15 @@ public ActivityTask poll() { try { permit = - slotSupplier.reserveSlot( - new SlotReservationData(pollRequest.getTaskQueue().getName(), false)); + slotSupplier + .reserveSlot(new SlotReservationData(pollRequest.getTaskQueue().getName(), false)) + .get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; + } catch (ExecutionException e) { + log.warn("Error while trying to reserve a slot for an activity", e.getCause()); + return null; } try { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityExecutionContext.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityExecutionContext.java index 8dc5826e9..2129ede5f 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityExecutionContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityExecutionContext.java @@ -162,6 +162,7 @@ public boolean callback(LocalActivityResult result) { if (scheduleToCloseFuture != null) { scheduleToCloseFuture.cancel(false); } + // TODO: Inspect result and apply different reasons slotSupplier.releaseSlot(SlotReleaseReason.taskComplete(), permit); return executionResult.complete(result); } @@ -174,7 +175,7 @@ public void newAttempt() { executionParams.getOnNewAttemptCallback().apply(); } - public void setPermit(@Nonnull SlotPermit permit) { + public void setPermit(SlotPermit permit) { this.permit = permit; } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java index 7d26c4833..4e4348fc0 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java @@ -47,7 +47,6 @@ import io.temporal.workflow.Functions; import java.time.Duration; import java.util.Objects; -import java.util.Optional; import java.util.concurrent.*; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -176,7 +175,6 @@ private RetryDecision shouldRetry( /** * @param executionContext execution context of the activity - * @param permit the slot permit for this LA execution * @param backoff delay time in milliseconds to the next attempt * @param failure if supplied, it will be used to override {@link * LocalActivityExecutionContext#getLastAttemptFailure()} @@ -259,49 +257,67 @@ public boolean dispatch( private boolean submitANewExecution( @Nonnull LocalActivityExecutionContext executionContext, @Nonnull PollActivityTaskQueueResponse.Builder activityTask, - @Nullable Deadline acceptanceDeadline) { + @Nullable Deadline heartbeatDeadline) { + long acceptanceTimeoutMs = 0; + boolean timeoutIsScheduleToStart = false; + if (heartbeatDeadline != null) { + acceptanceTimeoutMs = heartbeatDeadline.timeRemaining(TimeUnit.MILLISECONDS); + } + Duration scheduleToStartTimeout = executionContext.getScheduleToStartTimeout(); + if (scheduleToStartTimeout != null) { + long scheduleToStartTimeoutMs = scheduleToStartTimeout.toMillis(); + if (scheduleToStartTimeoutMs > 0 && scheduleToStartTimeoutMs < acceptanceTimeoutMs) { + acceptanceTimeoutMs = scheduleToStartTimeoutMs; + timeoutIsScheduleToStart = true; + } + } try { SlotPermit permit = null; SlotReservationData reservationCtx = new SlotReservationData(taskQueue, false); - if (acceptanceDeadline == null) { - permit = slotSupplier.reserveSlot(reservationCtx); + if (acceptanceTimeoutMs <= 0) { + permit = slotSupplier.reserveSlot(reservationCtx).get(); } else { - long acceptanceTimeoutMs = acceptanceDeadline.timeRemaining(TimeUnit.MILLISECONDS); - // todo: use (acceptanceTimeoutMs, TimeUnit.MILLISECONDS) - Optional maybePermit = slotSupplier.tryReserveSlot(reservationCtx); - if (maybePermit.isPresent()) { - permit = maybePermit.get(); - } - if (permit == null) { - log.warn( - "LocalActivity queue is full and submitting timed out for activity {} with acceptanceTimeoutMs: {}", - activityTask.getActivityId(), - acceptanceTimeoutMs); + try { + permit = + slotSupplier + .reserveSlot(reservationCtx) + .get(acceptanceTimeoutMs, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + // In the event that we timed out waiting for a permit *because of schedule to start* we + // still want to proceed with the "attempt" with a null permit, which will then + // immediately fail with the s2s timeout. + if (!timeoutIsScheduleToStart) { + log.warn( + "LocalActivity queue is full and submitting timed out for activity {} with acceptanceTimeoutMs: {}", + activityTask.getActivityId(), + acceptanceTimeoutMs); + return false; + } } } - if (permit != null) { - executionContext.setPermit(permit); - // we should publish scheduleToClose before submission, so the handlers always see a full - // state of executionContext - @Nullable - Deadline scheduleToCloseDeadline = executionContext.getScheduleToCloseDeadline(); - if (scheduleToCloseDeadline != null) { - ScheduledFuture scheduleToCloseFuture = - scheduledExecutor.schedule( - new FinalTimeoutHandler( - TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, executionContext), - scheduleToCloseDeadline.timeRemaining(TimeUnit.MILLISECONDS), - TimeUnit.MILLISECONDS); - executionContext.setScheduleToCloseFuture(scheduleToCloseFuture); - } - submitAttempt(executionContext, activityTask); - log.trace("LocalActivity queued: {}", activityTask.getActivityId()); + executionContext.setPermit(permit); + // we should publish scheduleToClose before submission, so the handlers always see a full + // state of executionContext + @Nullable Deadline scheduleToCloseDeadline = executionContext.getScheduleToCloseDeadline(); + if (scheduleToCloseDeadline != null) { + ScheduledFuture scheduleToCloseFuture = + scheduledExecutor.schedule( + new FinalTimeoutHandler( + TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, executionContext), + scheduleToCloseDeadline.timeRemaining(TimeUnit.MILLISECONDS), + TimeUnit.MILLISECONDS); + executionContext.setScheduleToCloseFuture(scheduleToCloseFuture); } - return permit != null; + submitAttempt(executionContext, activityTask); + log.warn("LocalActivity queued: {}", activityTask.getActivityId()); + return true; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; + } catch (ExecutionException e) { + log.warn("Error while trying to reserve a slot for local activity", e.getCause()); + return false; } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/PollTaskExecutor.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/PollTaskExecutor.java index b35f79e86..b99b708d5 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/PollTaskExecutor.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/PollTaskExecutor.java @@ -98,8 +98,7 @@ public void process(@Nonnull T task) { taskExecutor.execute( () -> { // TODO: Unclear if we want to keep this metric here, since users might define - // arbitrary/useless slot - // limits + // arbitrary/useless slot limits availableTaskSlots.decrementAndGet(); publishSlotsMetric(); try { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java index 417616787..1229941e1 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java @@ -37,6 +37,7 @@ import io.temporal.worker.MetricsType; import io.temporal.worker.slotsupplier.*; import java.util.Objects; +import java.util.concurrent.ExecutionException; import java.util.function.Supplier; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -123,11 +124,15 @@ public WorkflowTask poll() { SlotPermit permit; try { permit = - slotSupplier.reserveSlot( - new SlotReservationData(pollRequest.getTaskQueue().getName(), false)); + slotSupplier + .reserveSlot(new SlotReservationData(pollRequest.getTaskQueue().getName(), false)) + .get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; + } catch (ExecutionException e) { + log.warn("Error while trying to reserve a slot for workflow task", e.getCause()); + return null; } // TODO: Needs to move above the slot reservation. Make sure this doesn't break stuff diff --git a/temporal-sdk/src/main/java/io/temporal/worker/slotsupplier/FixedSizeSlotSupplier.java b/temporal-sdk/src/main/java/io/temporal/worker/slotsupplier/FixedSizeSlotSupplier.java index 6a6ee2fe0..7b940e50a 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/slotsupplier/FixedSizeSlotSupplier.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/slotsupplier/FixedSizeSlotSupplier.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Semaphore; public class FixedSizeSlotSupplier implements SlotSupplier { @@ -35,9 +36,16 @@ public FixedSizeSlotSupplier(int numSlots) { } @Override - public SlotPermit reserveSlot(SlotReservationContext ctx) throws InterruptedException { - executorSlotsSemaphore.acquire(); - return new SlotPermit(); + public CompletableFuture reserveSlot(SlotReservationContext ctx) { + return CompletableFuture.supplyAsync( + () -> { + try { + executorSlotsSemaphore.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return new SlotPermit(); + }); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/worker/slotsupplier/SlotSupplier.java b/temporal-sdk/src/main/java/io/temporal/worker/slotsupplier/SlotSupplier.java index 22cf90105..7628e71b4 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/slotsupplier/SlotSupplier.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/slotsupplier/SlotSupplier.java @@ -21,10 +21,10 @@ package io.temporal.worker.slotsupplier; import java.util.Optional; +import java.util.concurrent.CompletableFuture; public interface SlotSupplier { - // TODO: Needs to be cancellable - SlotPermit reserveSlot(SlotReservationContext ctx) throws InterruptedException; + CompletableFuture reserveSlot(SlotReservationContext ctx); Optional tryReserveSlot(SlotReservationContext ctx); diff --git a/temporal-sdk/src/main/java/io/temporal/worker/slotsupplier/TrackingSlotSupplier.java b/temporal-sdk/src/main/java/io/temporal/worker/slotsupplier/TrackingSlotSupplier.java index 209c3f28c..ecf6aa37d 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/slotsupplier/TrackingSlotSupplier.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/slotsupplier/TrackingSlotSupplier.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; // TODO: Also make pauseable? @@ -36,9 +37,9 @@ public TrackingSlotSupplier(SlotSupplier inner) { } @Override - public SlotPermit reserveSlot(SlotReservationContext ctx) throws InterruptedException { - SlotPermit p = inner.reserveSlot(ctx); - issuedSlots.incrementAndGet(); + public CompletableFuture reserveSlot(SlotReservationContext ctx) { + CompletableFuture p = inner.reserveSlot(ctx); + p.thenApply(_p -> issuedSlots.incrementAndGet()); return p; } @@ -53,12 +54,14 @@ public Optional tryReserveSlot(SlotReservationContext ctx) { @Override public void markSlotUsed(SlotInfo slotInfo, SlotPermit permit) { + System.out.println("Marking slot used: " + slotInfo + " With permit: " + permit); inner.markSlotUsed(slotInfo, permit); usedSlots.put(permit, slotInfo); } @Override public void releaseSlot(SlotReleaseReason reason, SlotPermit permit) { + System.out.println("Releasing slot: " + permit); inner.releaseSlot(reason, permit); issuedSlots.decrementAndGet(); usedSlots.remove(permit);