Skip to content

Commit

Permalink
Call shutdown RPC on worker shutdown (#2264)
Browse files Browse the repository at this point in the history
Call shutdownWorker on worker shutdown
  • Loading branch information
Quinn-With-Two-Ns authored Oct 10, 2024
1 parent 393045d commit 92692b4
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@

package io.temporal.internal.worker;

import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.api.workflowservice.v1.ShutdownWorkerResponse;
import java.io.Closeable;
import java.time.Duration;
import java.util.concurrent.*;
Expand Down Expand Up @@ -119,6 +123,33 @@ private CompletableFuture<Void> limitedWait(
return future;
}

/**
* Wait for {@code shutdownRequest} to finish. shutdownRequest is considered best effort, so we do
* not fail the shutdown if it fails.
*/
public CompletableFuture<Void> waitOnWorkerShutdownRequest(
ListenableFuture<ShutdownWorkerResponse> shutdownRequest) {
CompletableFuture<Void> future = new CompletableFuture<>();
shutdownRequest.addListener(
() -> {
try {
shutdownRequest.get();
} catch (StatusRuntimeException e) {
// If the server does not support shutdown, ignore the exception
if (Status.Code.UNIMPLEMENTED.equals(e.getStatus().getCode())) {
return;
}
log.warn("failed to call shutdown worker", e);
} catch (Exception e) {
log.warn("failed to call shutdown worker", e);
} finally {
future.complete(null);
}
},
scheduledExecutorService);
return future;
}

@Override
public void close() {
scheduledExecutorService.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.slf4j.MDC;

final class WorkflowWorker implements SuspendableWorker {
private static final String GRACEFUL_SHUTDOWN_MESSAGE = "graceful shutdown";
private static final Logger log = LoggerFactory.getLogger(WorkflowWorker.class);

private final WorkflowRunLockManager runLocks;
Expand Down Expand Up @@ -162,30 +163,49 @@ public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean
&& stickyTaskQueueName != null
&& stickyQueueBalancer != null;

return CompletableFuture.completedFuture(null)
.thenCompose(
ignore ->
stickyQueueBalancerDrainEnabled
? shutdownManager.waitForStickyQueueBalancer(
stickyQueueBalancer, options.getDrainStickyTaskQueueTimeout())
: CompletableFuture.completedFuture(null))
.thenCompose(ignore -> poller.shutdown(shutdownManager, interruptTasks))
.thenCompose(
ignore ->
!interruptTasks
? shutdownManager.waitForSupplierPermitsReleasedUnlimited(
slotSupplier, supplierName)
: CompletableFuture.completedFuture(null))
.thenCompose(
ignore ->
pollTaskExecutor != null
? pollTaskExecutor.shutdown(shutdownManager, interruptTasks)
: CompletableFuture.completedFuture(null))
.exceptionally(
e -> {
log.error("Unexpected exception during shutdown", e);
return null;
});
CompletableFuture<Void> pollerShutdown =
CompletableFuture.completedFuture(null)
.thenCompose(
ignore ->
stickyQueueBalancerDrainEnabled
? shutdownManager.waitForStickyQueueBalancer(
stickyQueueBalancer, options.getDrainStickyTaskQueueTimeout())
: CompletableFuture.completedFuture(null))
.thenCompose(ignore -> poller.shutdown(shutdownManager, interruptTasks));
return CompletableFuture.allOf(
pollerShutdown.thenCompose(
ignore -> {
if (!interruptTasks && stickyTaskQueueName != null) {
return shutdownManager.waitOnWorkerShutdownRequest(
service
.futureStub()
.shutdownWorker(
ShutdownWorkerRequest.newBuilder()
.setIdentity(options.getIdentity())
.setNamespace(namespace)
.setStickyTaskQueue(stickyTaskQueueName)
.setReason(GRACEFUL_SHUTDOWN_MESSAGE)
.build()));
}
return CompletableFuture.completedFuture(null);
}),
pollerShutdown
.thenCompose(
ignore ->
!interruptTasks
? shutdownManager.waitForSupplierPermitsReleasedUnlimited(
slotSupplier, supplierName)
: CompletableFuture.completedFuture(null))
.thenCompose(
ignore ->
pollTaskExecutor != null
? pollTaskExecutor.shutdown(shutdownManager, interruptTasks)
: CompletableFuture.completedFuture(null))
.exceptionally(
e -> {
log.error("Unexpected exception during shutdown", e);
return null;
}));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

import com.google.common.util.concurrent.Futures;
import com.google.protobuf.ByteString;
import com.uber.m3.tally.NoopScope;
import com.uber.m3.tally.RootScopeBuilder;
Expand Down Expand Up @@ -100,9 +101,15 @@ public void concurrentPollRequestLockTest() throws Exception {
eagerActivityDispatcher,
slotSupplier);

WorkflowServiceGrpc.WorkflowServiceFutureStub futureStub =
mock(WorkflowServiceGrpc.WorkflowServiceFutureStub.class);
when(futureStub.shutdownWorker(any(ShutdownWorkerRequest.class)))
.thenReturn(Futures.immediateFuture(ShutdownWorkerResponse.newBuilder().build()));

WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub =
mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class);
when(client.blockingStub()).thenReturn(blockingStub);
when(client.futureStub()).thenReturn(futureStub);
when(blockingStub.withOption(any(), any())).thenReturn(blockingStub);

PollWorkflowTaskQueueResponse pollResponse =
Expand Down Expand Up @@ -259,9 +266,15 @@ public void respondWorkflowTaskFailureMetricTest() throws Exception {
eagerActivityDispatcher,
slotSupplier);

WorkflowServiceGrpc.WorkflowServiceFutureStub futureStub =
mock(WorkflowServiceGrpc.WorkflowServiceFutureStub.class);
when(futureStub.shutdownWorker(any(ShutdownWorkerRequest.class)))
.thenReturn(Futures.immediateFuture(ShutdownWorkerResponse.newBuilder().build()));

WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub =
mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class);
when(client.blockingStub()).thenReturn(blockingStub);
when(client.futureStub()).thenReturn(futureStub);
when(blockingStub.withOption(any(), any())).thenReturn(blockingStub);

PollWorkflowTaskQueueResponse pollResponse =
Expand Down

0 comments on commit 92692b4

Please sign in to comment.