Skip to content

Commit

Permalink
W-16640190: Avoid deadlock when consuming the payload in the whenComp…
Browse files Browse the repository at this point in the history
…lete callback of sendAsync result (#653)
  • Loading branch information
eze210 authored Oct 3, 2024
1 parent b934dea commit 2cc6309
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,8 @@ private CompletableFuture<HttpResponse> sendAsync(HttpRequest request, Request g
CompletableFuture<HttpResponse> auxFuture = new CompletableFuture<>();
if (streamingEnabled) {
asyncHandler =
new PreservingClassLoaderAsyncHandler<>(new ResponseBodyDeferringAsyncHandler(auxFuture, responseBufferSize));
new PreservingClassLoaderAsyncHandler<>(new ResponseBodyDeferringAsyncHandler(auxFuture, responseBufferSize,
workerScheduler));
} else {
asyncHandler = new PreservingClassLoaderAsyncHandler<>(new ResponseAsyncHandler(auxFuture));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import static org.mule.runtime.api.util.MuleSystemProperties.SYSTEM_PROPERTY_PREFIX;
import static org.mule.runtime.http.api.HttpHeaders.Names.CONTENT_LENGTH;
import static org.mule.runtime.http.api.HttpHeaders.Names.TRANSFER_ENCODING;

import org.mule.runtime.http.api.domain.message.response.HttpResponse;
import org.mule.service.http.impl.service.client.HttpResponseCreator;
import org.mule.service.http.impl.service.util.ThreadContext;
import org.mule.service.http.impl.util.TimedPipedInputStream;
import org.mule.service.http.impl.util.TimedPipedOutputStream;

Expand All @@ -42,6 +44,9 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -67,11 +72,12 @@ public class ResponseBodyDeferringAsyncHandler implements AsyncHandler<Response>
private static final Logger LOGGER = LoggerFactory.getLogger(ResponseBodyDeferringAsyncHandler.class);
private static final String PIPE_READ_TIMEOUT_PROPERTY_NAME =
SYSTEM_PROPERTY_PREFIX + "http.responseStreaming.pipeReadTimeoutMillis";
private static final long PIPE_READ_TIMEOUT_MILLIS = parseInt(getProperty(PIPE_READ_TIMEOUT_PROPERTY_NAME, "20000"));
private static long PIPE_READ_TIMEOUT_MILLIS = parseInt(getProperty(PIPE_READ_TIMEOUT_PROPERTY_NAME, "20000"));
private static Field responseField;

private volatile Response response;
private int bufferSize;
private final ExecutorService workerScheduler;
private OutputStream output;
private Optional<InputStream> input = empty();
private final CompletableFuture<HttpResponse> future;
Expand All @@ -91,9 +97,11 @@ public class ResponseBodyDeferringAsyncHandler implements AsyncHandler<Response>

private AtomicBoolean throwableReceived = new AtomicBoolean(false);

public ResponseBodyDeferringAsyncHandler(CompletableFuture<HttpResponse> future, int userDefinedBufferSize) throws IOException {
public ResponseBodyDeferringAsyncHandler(CompletableFuture<HttpResponse> future, int userDefinedBufferSize,
ExecutorService workerScheduler) {
this.future = future;
this.bufferSize = userDefinedBufferSize;
this.workerScheduler = workerScheduler;
this.mdc = MDC.getCopyOfContextMap();
}

Expand Down Expand Up @@ -284,14 +292,45 @@ public Response onCompleted() throws IOException {

private void handleIfNecessary() {
if (!handled.getAndSet(true)) {
response = responseBuilder.build();
try {
future.complete(httpResponseCreator.create(response, input.orElse(response.getResponseBodyAsStream())));
} catch (IOException e) {
// Make sure all resources are accounted for and since we've set the handled flag, handle the future explicitly
onThrowable(e);
future.completeExceptionally(e);
if (shouldCompleteAsync()) {
try {
LOGGER.debug("Scheduling response future completion to workers scheduler");
ClassLoader outerTccl = Thread.currentThread().getContextClassLoader();
workerScheduler.submit(() -> {
try (ThreadContext ctx = new ThreadContext(outerTccl, mdc)) {
completeResponseFuture();
}
});
} catch (RejectedExecutionException e) {
LOGGER.warn("Couldn't schedule completion to workers scheduler, completing it synchronously");
completeResponseFuture();
}
} else {
completeResponseFuture();
}
}
}

private boolean shouldCompleteAsync() {
return input.isPresent();
}

private void completeResponseFuture() {
response = responseBuilder.build();
try {
future.complete(httpResponseCreator.create(response, input.orElse(response.getResponseBodyAsStream())));
} catch (IOException e) {
// Make sure all resources are accounted for and since we've set the handled flag, handle the future explicitly
onThrowable(e);
future.completeExceptionally(e);
}
}

/**
* @deprecated Used only for testing
*/
@Deprecated
static void refreshSystemProperties() {
PIPE_READ_TIMEOUT_MILLIS = parseInt(getProperty(PIPE_READ_TIMEOUT_PROPERTY_NAME, "20000"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
* Auxiliary auto-closeable class to be used in a try-with-resources scope. Client would create an instance of ThreadContext
* passing a class loader and a map to be set as mdc while the instance is open. When the instance goes out of scope, it's closed
* and the previous class loader and mdc are restored.
*
* Usage example: <code>
* <p>
* Usage example:
*
* <pre>
* {@code
* try (ThreadContext tc = new ThreadContext(theClassLoader, theMDC)) {
* // The code in this scope will use theClassLoader and theMDC.
* }
* // Out of the scope, the code will use the outer class loader and mdc.
* </code>
* }
* </pre>
*
* It's only intended to be used in a try-with-resources block, avoid using it in another fashion.
*/
Expand Down
Loading

0 comments on commit 2cc6309

Please sign in to comment.