Skip to content

Commit

Permalink
Use completable future, fix sched-to-start timeout test
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Mar 25, 2024
1 parent 0c2a4a1 commit 76a5921
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.temporal.worker.MetricsType;
import io.temporal.worker.slotsupplier.*;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -97,11 +98,15 @@ public ActivityTask poll() {

try {
permit =
slotSupplier.reserveSlot(
new SlotReservationData(pollRequest.getTaskQueue().getName(), false));
slotSupplier
.reserveSlot(new SlotReservationData(pollRequest.getTaskQueue().getName(), false))
.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} catch (ExecutionException e) {
log.warn("Error while trying to reserve a slot for an activity", e.getCause());
return null;
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ public boolean callback(LocalActivityResult result) {
if (scheduleToCloseFuture != null) {
scheduleToCloseFuture.cancel(false);
}
// TODO: Inspect result and apply different reasons
slotSupplier.releaseSlot(SlotReleaseReason.taskComplete(), permit);
return executionResult.complete(result);
}
Expand All @@ -174,7 +175,7 @@ public void newAttempt() {
executionParams.getOnNewAttemptCallback().apply();
}

public void setPermit(@Nonnull SlotPermit permit) {
public void setPermit(SlotPermit permit) {
this.permit = permit;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import io.temporal.workflow.Functions;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.*;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -176,7 +175,6 @@ private RetryDecision shouldRetry(

/**
* @param executionContext execution context of the activity
* @param permit the slot permit for this LA execution
* @param backoff delay time in milliseconds to the next attempt
* @param failure if supplied, it will be used to override {@link
* LocalActivityExecutionContext#getLastAttemptFailure()}
Expand Down Expand Up @@ -259,49 +257,67 @@ public boolean dispatch(
private boolean submitANewExecution(
@Nonnull LocalActivityExecutionContext executionContext,
@Nonnull PollActivityTaskQueueResponse.Builder activityTask,
@Nullable Deadline acceptanceDeadline) {
@Nullable Deadline heartbeatDeadline) {
long acceptanceTimeoutMs = 0;
boolean timeoutIsScheduleToStart = false;
if (heartbeatDeadline != null) {
acceptanceTimeoutMs = heartbeatDeadline.timeRemaining(TimeUnit.MILLISECONDS);
}
Duration scheduleToStartTimeout = executionContext.getScheduleToStartTimeout();
if (scheduleToStartTimeout != null) {
long scheduleToStartTimeoutMs = scheduleToStartTimeout.toMillis();
if (scheduleToStartTimeoutMs > 0 && scheduleToStartTimeoutMs < acceptanceTimeoutMs) {
acceptanceTimeoutMs = scheduleToStartTimeoutMs;
timeoutIsScheduleToStart = true;
}
}
try {
SlotPermit permit = null;
SlotReservationData reservationCtx = new SlotReservationData(taskQueue, false);
if (acceptanceDeadline == null) {
permit = slotSupplier.reserveSlot(reservationCtx);
if (acceptanceTimeoutMs <= 0) {
permit = slotSupplier.reserveSlot(reservationCtx).get();
} else {
long acceptanceTimeoutMs = acceptanceDeadline.timeRemaining(TimeUnit.MILLISECONDS);
// todo: use (acceptanceTimeoutMs, TimeUnit.MILLISECONDS)
Optional<SlotPermit> maybePermit = slotSupplier.tryReserveSlot(reservationCtx);
if (maybePermit.isPresent()) {
permit = maybePermit.get();
}
if (permit == null) {
log.warn(
"LocalActivity queue is full and submitting timed out for activity {} with acceptanceTimeoutMs: {}",
activityTask.getActivityId(),
acceptanceTimeoutMs);
try {
permit =
slotSupplier
.reserveSlot(reservationCtx)
.get(acceptanceTimeoutMs, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
// In the event that we timed out waiting for a permit *because of schedule to start* we
// still want to proceed with the "attempt" with a null permit, which will then
// immediately fail with the s2s timeout.
if (!timeoutIsScheduleToStart) {
log.warn(
"LocalActivity queue is full and submitting timed out for activity {} with acceptanceTimeoutMs: {}",
activityTask.getActivityId(),
acceptanceTimeoutMs);
return false;
}
}
}

if (permit != null) {
executionContext.setPermit(permit);
// we should publish scheduleToClose before submission, so the handlers always see a full
// state of executionContext
@Nullable
Deadline scheduleToCloseDeadline = executionContext.getScheduleToCloseDeadline();
if (scheduleToCloseDeadline != null) {
ScheduledFuture<?> scheduleToCloseFuture =
scheduledExecutor.schedule(
new FinalTimeoutHandler(
TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, executionContext),
scheduleToCloseDeadline.timeRemaining(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
executionContext.setScheduleToCloseFuture(scheduleToCloseFuture);
}
submitAttempt(executionContext, activityTask);
log.trace("LocalActivity queued: {}", activityTask.getActivityId());
executionContext.setPermit(permit);
// we should publish scheduleToClose before submission, so the handlers always see a full
// state of executionContext
@Nullable Deadline scheduleToCloseDeadline = executionContext.getScheduleToCloseDeadline();
if (scheduleToCloseDeadline != null) {
ScheduledFuture<?> scheduleToCloseFuture =
scheduledExecutor.schedule(
new FinalTimeoutHandler(
TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, executionContext),
scheduleToCloseDeadline.timeRemaining(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
executionContext.setScheduleToCloseFuture(scheduleToCloseFuture);
}
return permit != null;
submitAttempt(executionContext, activityTask);
log.warn("LocalActivity queued: {}", activityTask.getActivityId());
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
} catch (ExecutionException e) {
log.warn("Error while trying to reserve a slot for local activity", e.getCause());
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ public void process(@Nonnull T task) {
taskExecutor.execute(
() -> {
// TODO: Unclear if we want to keep this metric here, since users might define
// arbitrary/useless slot
// limits
// arbitrary/useless slot limits
availableTaskSlots.decrementAndGet();
publishSlotsMetric();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.temporal.worker.MetricsType;
import io.temporal.worker.slotsupplier.*;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -123,11 +124,15 @@ public WorkflowTask poll() {
SlotPermit permit;
try {
permit =
slotSupplier.reserveSlot(
new SlotReservationData(pollRequest.getTaskQueue().getName(), false));
slotSupplier
.reserveSlot(new SlotReservationData(pollRequest.getTaskQueue().getName(), false))
.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} catch (ExecutionException e) {
log.warn("Error while trying to reserve a slot for workflow task", e.getCause());
return null;
}

// TODO: Needs to move above the slot reservation. Make sure this doesn't break stuff
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.common.base.Preconditions;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;

public class FixedSizeSlotSupplier<T> implements SlotSupplier<T> {
Expand All @@ -35,9 +36,16 @@ public FixedSizeSlotSupplier(int numSlots) {
}

@Override
public SlotPermit reserveSlot(SlotReservationContext ctx) throws InterruptedException {
executorSlotsSemaphore.acquire();
return new SlotPermit();
public CompletableFuture<SlotPermit> reserveSlot(SlotReservationContext ctx) {
return CompletableFuture.supplyAsync(
() -> {
try {
executorSlotsSemaphore.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return new SlotPermit();
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
package io.temporal.worker.slotsupplier;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

public interface SlotSupplier<SlotInfo> {
// TODO: Needs to be cancellable
SlotPermit reserveSlot(SlotReservationContext ctx) throws InterruptedException;
CompletableFuture<SlotPermit> reserveSlot(SlotReservationContext ctx);

Optional<SlotPermit> tryReserveSlot(SlotReservationContext ctx);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

// TODO: Also make pauseable?
Expand All @@ -36,9 +37,9 @@ public TrackingSlotSupplier(SlotSupplier<SlotInfo> inner) {
}

@Override
public SlotPermit reserveSlot(SlotReservationContext ctx) throws InterruptedException {
SlotPermit p = inner.reserveSlot(ctx);
issuedSlots.incrementAndGet();
public CompletableFuture<SlotPermit> reserveSlot(SlotReservationContext ctx) {
CompletableFuture<SlotPermit> p = inner.reserveSlot(ctx);
p.thenApply(_p -> issuedSlots.incrementAndGet());
return p;
}

Expand All @@ -53,12 +54,14 @@ public Optional<SlotPermit> tryReserveSlot(SlotReservationContext ctx) {

@Override
public void markSlotUsed(SlotInfo slotInfo, SlotPermit permit) {
System.out.println("Marking slot used: " + slotInfo + " With permit: " + permit);
inner.markSlotUsed(slotInfo, permit);
usedSlots.put(permit, slotInfo);
}

@Override
public void releaseSlot(SlotReleaseReason reason, SlotPermit permit) {
System.out.println("Releasing slot: " + permit);
inner.releaseSlot(reason, permit);
issuedSlots.decrementAndGet();
usedSlots.remove(permit);
Expand Down

0 comments on commit 76a5921

Please sign in to comment.