From 1daf4582bbaf4fdf36b0762a7dd758cfc63fbe11 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Wed, 9 Oct 2024 15:42:24 -0700 Subject: [PATCH] Call shutdownWorker on worker shutdown --- .../internal/worker/ShutdownManager.java | 31 +++++++++ .../internal/worker/WorkflowWorker.java | 68 ++++++++++++------- .../internal/worker/WorkflowWorkerTest.java | 7 ++ 3 files changed, 82 insertions(+), 24 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/ShutdownManager.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/ShutdownManager.java index f790febb4..a98eae1d9 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/ShutdownManager.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/ShutdownManager.java @@ -20,6 +20,10 @@ package io.temporal.internal.worker; +import com.google.common.util.concurrent.ListenableFuture; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.temporal.api.workflowservice.v1.ShutdownWorkerResponse; import java.io.Closeable; import java.time.Duration; import java.util.concurrent.*; @@ -119,6 +123,33 @@ private CompletableFuture limitedWait( return future; } + /** + * Wait for {@code shutdownRequest} to finish. shutdownRequest is considered best effort, so we do + * not fail the shutdown if it fails. + */ + public CompletableFuture waitOnWorkerShutdownRequest( + ListenableFuture shutdownRequest) { + CompletableFuture future = new CompletableFuture<>(); + shutdownRequest.addListener( + () -> { + try { + shutdownRequest.get(); + } catch (StatusRuntimeException e) { + // If the server does not support shutdown, ignore the exception + if (Status.Code.UNIMPLEMENTED.equals(e.getStatus().getCode())) { + return; + } + log.warn("failed to call shutdown worker", e); + } catch (Exception e) { + log.warn("failed to call shutdown worker", e); + } finally { + future.complete(null); + } + }, + scheduledExecutorService); + return future; + } + @Override public void close() { scheduledExecutorService.shutdownNow(); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java index aa5f1d25b..407c5bc0c 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java @@ -56,6 +56,7 @@ import org.slf4j.MDC; final class WorkflowWorker implements SuspendableWorker { + private static final String GRACEFUL_SHUTDOWN_MESSAGE = "graceful shutdown"; private static final Logger log = LoggerFactory.getLogger(WorkflowWorker.class); private final WorkflowRunLockManager runLocks; @@ -162,30 +163,49 @@ public CompletableFuture shutdown(ShutdownManager shutdownManager, boolean && stickyTaskQueueName != null && stickyQueueBalancer != null; - return CompletableFuture.completedFuture(null) - .thenCompose( - ignore -> - stickyQueueBalancerDrainEnabled - ? shutdownManager.waitForStickyQueueBalancer( - stickyQueueBalancer, options.getDrainStickyTaskQueueTimeout()) - : CompletableFuture.completedFuture(null)) - .thenCompose(ignore -> poller.shutdown(shutdownManager, interruptTasks)) - .thenCompose( - ignore -> - !interruptTasks - ? shutdownManager.waitForSupplierPermitsReleasedUnlimited( - slotSupplier, supplierName) - : CompletableFuture.completedFuture(null)) - .thenCompose( - ignore -> - pollTaskExecutor != null - ? pollTaskExecutor.shutdown(shutdownManager, interruptTasks) - : CompletableFuture.completedFuture(null)) - .exceptionally( - e -> { - log.error("Unexpected exception during shutdown", e); - return null; - }); + CompletableFuture pollerShutdown = + CompletableFuture.completedFuture(null) + .thenCompose( + ignore -> + stickyQueueBalancerDrainEnabled + ? shutdownManager.waitForStickyQueueBalancer( + stickyQueueBalancer, options.getDrainStickyTaskQueueTimeout()) + : CompletableFuture.completedFuture(null)) + .thenCompose(ignore -> poller.shutdown(shutdownManager, interruptTasks)); + return CompletableFuture.allOf( + pollerShutdown.thenCompose( + ignore -> { + if (!interruptTasks && stickyTaskQueueName != null) { + return shutdownManager.waitOnWorkerShutdownRequest( + service + .futureStub() + .shutdownWorker( + ShutdownWorkerRequest.newBuilder() + .setIdentity(options.getIdentity()) + .setNamespace(namespace) + .setStickyTaskQueue(stickyTaskQueueName) + .setReason(GRACEFUL_SHUTDOWN_MESSAGE) + .build())); + } + return CompletableFuture.completedFuture(null); + }), + pollerShutdown + .thenCompose( + ignore -> + !interruptTasks + ? shutdownManager.waitForSupplierPermitsReleasedUnlimited( + slotSupplier, supplierName) + : CompletableFuture.completedFuture(null)) + .thenCompose( + ignore -> + pollTaskExecutor != null + ? pollTaskExecutor.shutdown(shutdownManager, interruptTasks) + : CompletableFuture.completedFuture(null)) + .exceptionally( + e -> { + log.error("Unexpected exception during shutdown", e); + return null; + })); } @Override diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java index 94fb6571d..5f9249566 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java @@ -26,6 +26,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; +import com.google.common.util.concurrent.Futures; import com.google.protobuf.ByteString; import com.uber.m3.tally.NoopScope; import com.uber.m3.tally.RootScopeBuilder; @@ -100,9 +101,15 @@ public void concurrentPollRequestLockTest() throws Exception { eagerActivityDispatcher, slotSupplier); + WorkflowServiceGrpc.WorkflowServiceFutureStub futureStub = + mock(WorkflowServiceGrpc.WorkflowServiceFutureStub.class); + when(futureStub.shutdownWorker(any(ShutdownWorkerRequest.class))) + .thenReturn(Futures.immediateFuture(ShutdownWorkerResponse.newBuilder().build())); + WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub = mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class); when(client.blockingStub()).thenReturn(blockingStub); + when(client.futureStub()).thenReturn(futureStub); when(blockingStub.withOption(any(), any())).thenReturn(blockingStub); PollWorkflowTaskQueueResponse pollResponse =