Skip to content

Commit

Permalink
Address some review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Apr 8, 2024
1 parent 20511af commit f27cfb6
Show file tree
Hide file tree
Showing 19 changed files with 239 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ public ActivityTask poll() {

try {
permit =
slotSupplier.reserveSlot(new SlotReservationData(pollRequest.getTaskQueue().getName()));
slotSupplier.reserveSlot(
new SlotReservationData(
pollRequest.getTaskQueue().getName(),
pollRequest.getIdentity(),
pollRequest.getWorkerVersionCapabilities().getBuildId()));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,8 @@ public Optional<SlotPermit> tryReserveActivitySlot(
return Optional.empty();
}
return ActivityWorker.this.slotSupplier.tryReserveSlot(
new SlotReservationData(ActivityWorker.this.taskQueue));
new SlotReservationData(
ActivityWorker.this.taskQueue, options.getIdentity(), options.getBuildId()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ public void close() {
}

private void releaseSlots(int slotsToRelease) {
if (slotsToRelease <= 0) return;
if (slotsToRelease == 0) return;
if (slotsToRelease < 0)
throw new IllegalArgumentException("Trying to release a negative number of activity slots");
if (slotsToRelease > this.reservedSlots.size())
throw new IllegalStateException(
"Trying to release more activity slots than outstanding reservations");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ private boolean submitANewExecution(
}
try {
SlotPermit permit = null;
SlotReservationData reservationCtx = new SlotReservationData(taskQueue);
SlotReservationData reservationCtx =
new SlotReservationData(taskQueue, options.getIdentity(), options.getBuildId());
if (acceptanceTimeoutMs <= 0) {
permit = slotSupplier.reserveSlot(reservationCtx);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
package io.temporal.internal.worker;

public class SlotReservationData {
private final String taskQueue;
public final String taskQueue;
public final String workerIdentity;
public final String workerBuildId;

public SlotReservationData(String taskQueue) {
public SlotReservationData(String taskQueue, String workerIdentity, String workerBuildId) {
this.taskQueue = taskQueue;
}

public String getTaskQueue() {
return taskQueue;
this.workerIdentity = workerIdentity;
this.workerBuildId = workerBuildId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@
import com.uber.m3.tally.NoopScope;
import com.uber.m3.tally.Scope;
import io.temporal.worker.MetricsType;
import io.temporal.worker.tuning.SlotPermit;
import io.temporal.worker.tuning.SlotReleaseReason;
import io.temporal.worker.tuning.SlotReservationContext;
import io.temporal.worker.tuning.SlotSupplier;
import io.temporal.worker.tuning.*;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
Expand All @@ -40,7 +37,7 @@
*
* @param <SI> The slot info type
*/
public class TrackingSlotSupplier<SI> {
public class TrackingSlotSupplier<SI extends SlotInfo> {
private final SlotSupplier<SI> inner;
private final AtomicInteger issuedSlots = new AtomicInteger();
private final Map<SlotPermit, SI> usedSlots = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -69,7 +66,7 @@ public void markSlotUsed(SI slotInfo, SlotPermit permit) {
if (permit == null) {
return;
}
inner.markSlotUsed(slotInfo, permit);
inner.markSlotUsed(new SlotMarkUsedContextImpl(slotInfo, permit));
usedSlots.put(permit, slotInfo);
publishSlotsMetric();
}
Expand All @@ -78,7 +75,8 @@ public void releaseSlot(SlotReleaseReason reason, SlotPermit permit) {
if (permit == null) {
return;
}
inner.releaseSlot(reason, permit);
SI slotInfo = usedSlots.get(permit);
inner.releaseSlot(new SlotReleaseContextImpl(reason, permit, slotInfo));
issuedSlots.decrementAndGet();
usedSlots.remove(permit);
publishSlotsMetric();
Expand Down Expand Up @@ -106,17 +104,29 @@ private void publishSlotsMetric() {
.update(maximumSlots() - usedSlots.size());
}

private SlotReservationContext<SI> createCtx(SlotReservationData dat) {
return new SlotResCtx(dat.getTaskQueue(), Collections.unmodifiableMap(usedSlots));
private SlotReserveContext<SI> createCtx(SlotReservationData dat) {
return new SlotReserveContextImpl(
dat.taskQueue,
Collections.unmodifiableMap(usedSlots),
dat.workerIdentity,
dat.workerBuildId);
}

private class SlotResCtx implements SlotReservationContext<SI> {
private class SlotReserveContextImpl implements SlotReserveContext<SI> {
private final String taskQueue;
private final Map<SlotPermit, SI> usedSlots;

private SlotResCtx(String taskQueue, Map<SlotPermit, SI> usedSlots) {
private final String workerIdentity;
private final String workerBuildId;

private SlotReserveContextImpl(
String taskQueue,
Map<SlotPermit, SI> usedSlots,
String workerIdentity,
String workerBuildId) {
this.taskQueue = taskQueue;
this.usedSlots = usedSlots;
this.workerIdentity = workerIdentity;
this.workerBuildId = workerBuildId;
}

@Override
Expand All @@ -125,8 +135,65 @@ public String getTaskQueue() {
}

@Override
public Map<SlotPermit, SI> usedSlots() {
public Map<SlotPermit, SI> getUsedSlots() {
return usedSlots;
}

@Override
public String getWorkerIdentity() {
return workerIdentity;
}

@Override
public String getWorkerBuildId() {
return workerBuildId;
}
}

private class SlotMarkUsedContextImpl implements SlotMarkUsedContext<SI> {
private final SI slotInfo;
private final SlotPermit slotPermit;

protected SlotMarkUsedContextImpl(SI slotInfo, SlotPermit slotPermit) {
this.slotInfo = slotInfo;
this.slotPermit = slotPermit;
}

@Override
public SI getSlotInfo() {
return slotInfo;
}

@Override
public SlotPermit getSlotPermit() {
return slotPermit;
}
}

private class SlotReleaseContextImpl implements SlotReleaseContext<SI> {
private final SlotPermit slotPermit;
private final SlotReleaseReason reason;
private final SI slotInfo;

protected SlotReleaseContextImpl(SlotReleaseReason reason, SlotPermit slotPermit, SI slotInfo) {
this.slotPermit = slotPermit;
this.reason = reason;
this.slotInfo = slotInfo;
}

@Override
public SlotReleaseReason getSlotReleaseReason() {
return reason;
}

@Override
public SlotPermit getSlotPermit() {
return slotPermit;
}

@Override
public SI getSlotInfo() {
return slotInfo;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,11 @@ public WorkflowTask poll() {
SlotPermit permit;
try {
permit =
slotSupplier.reserveSlot(new SlotReservationData(pollRequest.getTaskQueue().getName()));
slotSupplier.reserveSlot(
new SlotReservationData(
pollRequest.getTaskQueue().getName(),
pollRequest.getIdentity(),
pollRequest.getWorkerVersionCapabilities().getBuildId()));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ public WorkflowTaskDispatchHandle reserveWorkflowExecutor() {
return null;
}
return slotSupplier
.tryReserveSlot(new SlotReservationData(taskQueue))
.tryReserveSlot(
new SlotReservationData(taskQueue, options.getIdentity(), options.getBuildId()))
.map(
slotPermit ->
new WorkflowTaskDispatchHandle(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import java.util.Objects;

/** Contains information about a slot that is being used to execute an activity task. */
public class ActivitySlotInfo {
public class ActivitySlotInfo extends SlotInfo {
private final ActivityInfo activityInfo;
private final String workerIdentity;
private final String workerBuildId;

public ActivitySlotInfo(ActivityInfo info, String workerIdentity, String workerBuildId) {
this.activityInfo = info;
public ActivitySlotInfo(ActivityInfo activityInfo, String workerIdentity, String workerBuildId) {
this.activityInfo = activityInfo;
this.workerIdentity = workerIdentity;
this.workerBuildId = workerBuildId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
*
* @param <SI> The slot info type for this supplier.
*/
public class FixedSizeSlotSupplier<SI> implements SlotSupplier<SI> {
public class FixedSizeSlotSupplier<SI extends SlotInfo> implements SlotSupplier<SI> {
private final int numSlots;
private final Semaphore executorSlotsSemaphore;

Expand All @@ -41,13 +41,13 @@ public FixedSizeSlotSupplier(int numSlots) {
}

@Override
public SlotPermit reserveSlot(SlotReservationContext<SI> ctx) throws InterruptedException {
public SlotPermit reserveSlot(SlotReserveContext<SI> ctx) throws InterruptedException {
executorSlotsSemaphore.acquire();
return new SlotPermit();
}

@Override
public Optional<SlotPermit> tryReserveSlot(SlotReservationContext<SI> ctx) {
public Optional<SlotPermit> tryReserveSlot(SlotReserveContext<SI> ctx) {
boolean gotOne = executorSlotsSemaphore.tryAcquire();
if (gotOne) {
return Optional.of(new SlotPermit());
Expand All @@ -56,10 +56,10 @@ public Optional<SlotPermit> tryReserveSlot(SlotReservationContext<SI> ctx) {
}

@Override
public void markSlotUsed(SI slotInfo, SlotPermit permit) {}
public void markSlotUsed(SlotMarkUsedContext<SI> ctx) {}

@Override
public void releaseSlot(SlotReleaseReason reason, SlotPermit permit) {
public void releaseSlot(SlotReleaseContext<SI> ctx) {
executorSlotsSemaphore.release();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.Objects;

/** Contains information about a slot that is being used to execute a local activity. */
public class LocalActivitySlotInfo {
public class LocalActivitySlotInfo extends SlotInfo {
private final ActivityInfo activityInfo;
private final String workerIdentity;
private final String workerBuildId;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.temporal.worker.tuning;

/** The base class that all slot info types used by {@link SlotSupplier} extend. */
public abstract class SlotInfo {
SlotInfo() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,14 @@

package io.temporal.worker.tuning;

import java.util.Map;

public interface SlotReservationContext<SI> {
public interface SlotMarkUsedContext<SI extends SlotInfo> {
/**
* @return the Task Queue for which this reservation request is associated.
* @return The information associated with the slot that is being marked as used.
*/
String getTaskQueue();
SI getSlotInfo();

/**
* @return A mapping of slot permits to the information associated with the in-use slot.
* @return The previously reserved permit that is being used with this slot.
*/
Map<SlotPermit, SI> usedSlots();
SlotPermit getSlotPermit();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.worker.tuning;

import javax.annotation.Nullable;

public interface SlotReleaseContext<SI extends SlotInfo> {
/**
* @return The reason the slot is being released.
*/
SlotReleaseReason getSlotReleaseReason();

/**
* @return The permit the slot was using that is now being released.
*/
SlotPermit getSlotPermit();

/**
* @return The information associated with the slot that is being released. May be null if the
* slot was never marked as used.
*/
@Nullable
SI getSlotInfo();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

package io.temporal.worker.tuning;

import javax.annotation.Nullable;

public abstract class SlotReleaseReason {
SlotReleaseReason() {}

public static SlotReleaseReason taskComplete() {
return new TaskComplete();
Expand All @@ -42,7 +45,7 @@ public boolean isError() {
* @return the exception that caused the slot to be released, if this is a reason of type {@link
* Error}.
*/
public Exception getException() {
public @Nullable Exception getException() {
return null;
}

Expand Down
Loading

0 comments on commit f27cfb6

Please sign in to comment.