Skip to content

Commit

Permalink
Make builder dumb again
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Apr 10, 2024
1 parent ffc4a1e commit 786ad6b
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 33 deletions.
18 changes: 15 additions & 3 deletions temporal-sdk/src/main/java/io/temporal/worker/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,11 @@ public final class Worker {
activityWorker = null;
} else {
TrackingSlotSupplier<ActivitySlotInfo> activitySlotSupplier =
new TrackingSlotSupplier<>(this.options.getActivitySlotSupplier());
new TrackingSlotSupplier<>(
this.options.getActivitySlotSupplier() == null
? new FixedSizeSlotSupplier<>(
this.options.getMaxConcurrentActivityExecutionSize())
: this.options.getActivitySlotSupplier());

activityWorker =
new SyncActivityWorker(
Expand Down Expand Up @@ -138,9 +142,17 @@ public final class Worker {
factoryOptions, this.options, clientOptions, contextPropagators, taggedScope);

TrackingSlotSupplier<WorkflowSlotInfo> workflowSlotSupplier =
new TrackingSlotSupplier<>(this.options.getWorkflowSlotSupplier());
new TrackingSlotSupplier<>(
this.options.getWorkflowSlotSupplier() == null
? new FixedSizeSlotSupplier<>(
this.options.getMaxConcurrentWorkflowTaskExecutionSize())
: this.options.getWorkflowSlotSupplier());
TrackingSlotSupplier<LocalActivitySlotInfo> localActivitySlotSupplier =
new TrackingSlotSupplier<>(this.options.getLocalActivitySlotSupplier());
new TrackingSlotSupplier<>(
this.options.getLocalActivitySlotSupplier() == null
? new FixedSizeSlotSupplier<>(
this.options.getMaxConcurrentLocalActivityExecutionSize())
: this.options.getLocalActivitySlotSupplier());
workflowWorker =
new SyncWorkflowWorker(
service,
Expand Down
72 changes: 42 additions & 30 deletions temporal-sdk/src/main/java/io/temporal/worker/WorkerOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ private Builder(WorkerOptions o) {
return;
}
this.maxWorkerActivitiesPerSecond = o.maxWorkerActivitiesPerSecond;
this.maxConcurrentActivityExecutionSize = o.maxConcurrentActivityExecutionSize;
this.maxConcurrentWorkflowTaskExecutionSize = o.maxConcurrentWorkflowTaskExecutionSize;
this.maxConcurrentLocalActivityExecutionSize = o.maxConcurrentLocalActivityExecutionSize;
this.workflowSlotSupplier = o.workflowSlotSupplier;
this.activitySlotSupplier = o.activitySlotSupplier;
this.localActivitySlotSupplier = o.localActivitySlotSupplier;
Expand Down Expand Up @@ -416,6 +419,9 @@ public void setLocalActivitySlotSupplier(
public WorkerOptions build() {
return new WorkerOptions(
maxWorkerActivitiesPerSecond,
maxConcurrentActivityExecutionSize,
maxConcurrentWorkflowTaskExecutionSize,
maxConcurrentLocalActivityExecutionSize,
workflowSlotSupplier,
activitySlotSupplier,
localActivitySlotSupplier,
Expand Down Expand Up @@ -448,34 +454,16 @@ public WorkerOptions validateAndBuildWithDefaults() {
Preconditions.checkState(
maxConcurrentActivityExecutionSize == 0,
"maxConcurrentActivityExecutionSize must not be set if activitySlotSupplier is set");
} else {
activitySlotSupplier =
new FixedSizeSlotSupplier<>(
maxConcurrentActivityExecutionSize == 0
? DEFAULT_MAX_CONCURRENT_ACTIVITY_EXECUTION_SIZE
: maxConcurrentActivityExecutionSize);
}
if (workflowSlotSupplier != null) {
Preconditions.checkState(
maxConcurrentWorkflowTaskExecutionSize == 0,
"maxConcurrentWorkflowTaskExecutionSize must not be set if workflowSlotSupplier is set");
} else {
workflowSlotSupplier =
new FixedSizeSlotSupplier<>(
maxConcurrentWorkflowTaskExecutionSize == 0
? DEFAULT_MAX_CONCURRENT_WORKFLOW_TASK_EXECUTION_SIZE
: maxConcurrentWorkflowTaskExecutionSize);
}
if (localActivitySlotSupplier != null) {
Preconditions.checkState(
maxConcurrentLocalActivityExecutionSize == 0,
"maxConcurrentLocalActivityExecutionSize must not be set if localActivitySlotSupplier is set");
} else {
localActivitySlotSupplier =
new FixedSizeSlotSupplier<>(
maxConcurrentLocalActivityExecutionSize == 0
? DEFAULT_MAX_CONCURRENT_LOCAL_ACTIVITY_EXECUTION_SIZE
: maxConcurrentLocalActivityExecutionSize);
}
Preconditions.checkState(
maxTaskQueueActivitiesPerSecond >= 0, "negative taskQueueActivitiesPerSecond");
Expand All @@ -500,15 +488,18 @@ public WorkerOptions validateAndBuildWithDefaults() {

return new WorkerOptions(
maxWorkerActivitiesPerSecond,
workflowSlotSupplier == null
? new FixedSizeSlotSupplier<>(DEFAULT_MAX_CONCURRENT_WORKFLOW_TASK_EXECUTION_SIZE)
: workflowSlotSupplier,
activitySlotSupplier == null
? new FixedSizeSlotSupplier<>(DEFAULT_MAX_CONCURRENT_ACTIVITY_EXECUTION_SIZE)
: activitySlotSupplier,
localActivitySlotSupplier == null
? new FixedSizeSlotSupplier<>(DEFAULT_MAX_CONCURRENT_LOCAL_ACTIVITY_EXECUTION_SIZE)
: localActivitySlotSupplier,
maxConcurrentActivityExecutionSize == 0
? DEFAULT_MAX_CONCURRENT_ACTIVITY_EXECUTION_SIZE
: maxConcurrentActivityExecutionSize,
maxConcurrentWorkflowTaskExecutionSize == 0
? DEFAULT_MAX_CONCURRENT_WORKFLOW_TASK_EXECUTION_SIZE
: maxConcurrentWorkflowTaskExecutionSize,
maxConcurrentLocalActivityExecutionSize == 0
? DEFAULT_MAX_CONCURRENT_LOCAL_ACTIVITY_EXECUTION_SIZE
: maxConcurrentLocalActivityExecutionSize,
workflowSlotSupplier,
activitySlotSupplier,
localActivitySlotSupplier,
maxTaskQueueActivitiesPerSecond,
maxConcurrentWorkflowTaskPollers == 0
? DEFAULT_MAX_CONCURRENT_WORKFLOW_TASK_POLLERS
Expand Down Expand Up @@ -539,6 +530,9 @@ public WorkerOptions validateAndBuildWithDefaults() {
}

private final double maxWorkerActivitiesPerSecond;
private final int maxConcurrentActivityExecutionSize;
private final int maxConcurrentWorkflowTaskExecutionSize;
private final int maxConcurrentLocalActivityExecutionSize;
private final SlotSupplier<WorkflowSlotInfo> workflowSlotSupplier;
private final SlotSupplier<ActivitySlotInfo> activitySlotSupplier;
private final SlotSupplier<LocalActivitySlotInfo> localActivitySlotSupplier;
Expand All @@ -557,6 +551,9 @@ public WorkerOptions validateAndBuildWithDefaults() {

private WorkerOptions(
double maxWorkerActivitiesPerSecond,
int maxConcurrentActivityExecutionSize,
int maxConcurrentWorkflowTaskExecutionSize,
int maxConcurrentLocalActivityExecutionSize,
SlotSupplier<WorkflowSlotInfo> workflowSlotSupplier,
SlotSupplier<ActivitySlotInfo> activitySlotSupplier,
SlotSupplier<LocalActivitySlotInfo> localActivitySlotSupplier,
Expand All @@ -573,6 +570,9 @@ private WorkerOptions(
String buildId,
Duration stickyTaskQueueDrainTimeout) {
this.maxWorkerActivitiesPerSecond = maxWorkerActivitiesPerSecond;
this.maxConcurrentActivityExecutionSize = maxConcurrentActivityExecutionSize;
this.maxConcurrentWorkflowTaskExecutionSize = maxConcurrentWorkflowTaskExecutionSize;
this.maxConcurrentLocalActivityExecutionSize = maxConcurrentLocalActivityExecutionSize;
this.workflowSlotSupplier = workflowSlotSupplier;
this.activitySlotSupplier = activitySlotSupplier;
this.localActivitySlotSupplier = localActivitySlotSupplier;
Expand All @@ -595,15 +595,15 @@ public double getMaxWorkerActivitiesPerSecond() {
}

public int getMaxConcurrentActivityExecutionSize() {
return activitySlotSupplier.getMaximumSlots();
return maxConcurrentActivityExecutionSize;
}

public int getMaxConcurrentWorkflowTaskExecutionSize() {
return workflowSlotSupplier.getMaximumSlots();
return maxConcurrentWorkflowTaskExecutionSize;
}

public int getMaxConcurrentLocalActivityExecutionSize() {
return localActivitySlotSupplier.getMaximumSlots();
return maxConcurrentLocalActivityExecutionSize;
}

public double getMaxTaskQueueActivitiesPerSecond() {
Expand Down Expand Up @@ -689,6 +689,9 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
WorkerOptions that = (WorkerOptions) o;
return compare(maxWorkerActivitiesPerSecond, that.maxWorkerActivitiesPerSecond) == 0
&& maxConcurrentActivityExecutionSize == that.maxConcurrentActivityExecutionSize
&& maxConcurrentWorkflowTaskExecutionSize == that.maxConcurrentWorkflowTaskExecutionSize
&& maxConcurrentLocalActivityExecutionSize == that.maxConcurrentLocalActivityExecutionSize
&& compare(maxTaskQueueActivitiesPerSecond, that.maxTaskQueueActivitiesPerSecond) == 0
&& maxConcurrentWorkflowTaskPollers == that.maxConcurrentWorkflowTaskPollers
&& maxConcurrentActivityTaskPollers == that.maxConcurrentActivityTaskPollers
Expand All @@ -710,6 +713,9 @@ && compare(maxTaskQueueActivitiesPerSecond, that.maxTaskQueueActivitiesPerSecond
public int hashCode() {
return Objects.hash(
maxWorkerActivitiesPerSecond,
maxConcurrentActivityExecutionSize,
maxConcurrentWorkflowTaskExecutionSize,
maxConcurrentLocalActivityExecutionSize,
workflowSlotSupplier,
activitySlotSupplier,
localActivitySlotSupplier,
Expand All @@ -732,6 +738,12 @@ public String toString() {
return "WorkerOptions{"
+ "maxWorkerActivitiesPerSecond="
+ maxWorkerActivitiesPerSecond
+ ", maxConcurrentActivityExecutionSize="
+ maxConcurrentActivityExecutionSize
+ ", maxConcurrentWorkflowTaskExecutionSize="
+ maxConcurrentWorkflowTaskExecutionSize
+ ", maxConcurrentLocalActivityExecutionSize="
+ maxConcurrentLocalActivityExecutionSize
+ ", workflowSlotSupplier="
+ workflowSlotSupplier
+ ", activitySlotSupplier="
Expand Down

0 comments on commit 786ad6b

Please sign in to comment.