Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

W-16892585: Response streamer doesn't honor the selector timeout the first time it's called #654

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.nio.charset.Charset;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;

import javax.net.ssl.SSLSession;
Expand All @@ -60,14 +61,16 @@
public class GrizzlyRequestDispatcherFilter extends BaseFilter {

private final RequestHandlerProvider requestHandlerProvider;
private final ExecutorService workerPool;

private final byte[] SERVER_NOT_AVAILABLE_CONTENT = ("Server not available to handle this request, either not initialized yet "
+ "or it has been disposed.").getBytes(defaultCharset());

private ConcurrentMap<ServerAddress, AtomicInteger> activeRequests = new ConcurrentHashMap<>();

GrizzlyRequestDispatcherFilter(final RequestHandlerProvider requestHandlerProvider) {
GrizzlyRequestDispatcherFilter(final RequestHandlerProvider requestHandlerProvider, ExecutorService workerPool) {
this.requestHandlerProvider = requestHandlerProvider;
this.workerPool = workerPool;
}

@Override
Expand Down Expand Up @@ -140,7 +143,7 @@ public void responseReady(HttpResponse response, ResponseStatusCallback response

if (response.getEntity().isStreaming()) {
new ResponseStreamingCompletionHandler(ctx, requestHandler.getContextClassLoader(), request, response,
requestAdapterNotifyingResponseStatusCallback).start();
requestAdapterNotifyingResponseStatusCallback, workerPool).start();
} else {
new ResponseCompletionHandler(ctx, requestHandler.getContextClassLoader(), request, response,
requestAdapterNotifyingResponseStatusCallback).start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public GrizzlyServerManager(ExecutorService selectorPool,
this.httpListenerRegistry = httpListenerRegistry;
// TODO - MULE-14960: Remove system property once this can be configured through a file
this.serverTimeout = getInteger("mule.http.server.timeout", DEFAULT_SERVER_TIMEOUT_MILLIS);
requestHandlerFilter = new GrizzlyRequestDispatcherFilter(httpListenerRegistry);
requestHandlerFilter = new GrizzlyRequestDispatcherFilter(httpListenerRegistry, workerPool);
timeoutFilterDelegate = new GrizzlyAddressDelegateFilter<>();
sslFilterDelegate = new GrizzlyAddressDelegateFilter<>();
webSocketFilter = new GrizzlyAddressDelegateFilter<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,24 @@
*/
package org.mule.service.http.impl.service.server.grizzly;

import static com.google.common.base.Preconditions.checkArgument;
import static org.mule.runtime.api.util.DataUnit.KB;
import static org.mule.runtime.api.util.MuleSystemProperties.MULE_LOG_SEPARATION_DISABLED;
import static org.mule.runtime.api.util.MuleSystemProperties.SYSTEM_PROPERTY_PREFIX;
import static org.mule.runtime.core.api.util.ClassUtils.setContextClassLoader;
import static org.mule.runtime.core.api.util.StringUtils.isEmpty;
import static org.mule.runtime.http.api.HttpHeaders.Names.CONTENT_LENGTH;
import static org.mule.service.http.impl.service.server.grizzly.ExecutorPerServerAddressIOStrategy.DELEGATE_WRITES_IN_CONFIGURED_EXECUTOR;

import static java.lang.Integer.valueOf;
import static java.lang.Math.min;
import static java.lang.System.getProperty;
import static java.lang.System.nanoTime;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.lang.Thread.currentThread;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import static com.google.common.base.Preconditions.checkArgument;
import static org.glassfish.grizzly.http.HttpServerFilter.RESPONSE_COMPLETE_EVENT;
import static org.glassfish.grizzly.nio.transport.TCPNIOTransport.MAX_SEND_BUFFER_SIZE;
import static org.mule.runtime.api.util.DataUnit.KB;
import static org.mule.runtime.api.util.MuleSystemProperties.SYSTEM_PROPERTY_PREFIX;
import static org.mule.runtime.api.util.MuleSystemProperties.MULE_LOG_SEPARATION_DISABLED;
import static org.mule.runtime.core.api.util.ClassUtils.setContextClassLoader;
import static org.mule.runtime.core.api.util.StringUtils.isEmpty;
import static org.mule.runtime.http.api.HttpHeaders.Names.CONTENT_LENGTH;
import static org.mule.service.http.impl.service.server.grizzly.ExecutorPerServerAddressIOStrategy.DELEGATE_WRITES_IN_CONFIGURED_EXECUTOR;
import static org.slf4j.LoggerFactory.getLogger;

import org.mule.runtime.api.connection.SourceRemoteConnectionException;
Expand All @@ -33,6 +35,8 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;

import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.Connection;
Expand Down Expand Up @@ -62,14 +66,17 @@ public class ResponseStreamingCompletionHandler extends BaseResponseCompletionHa

private static final String SELECTOR_TIMEOUT = SYSTEM_PROPERTY_PREFIX + "timeoutToUseSelectorWhileStreamingResponseMillis";
private final long selectorTimeoutNanos = MILLISECONDS.toNanos(Long.valueOf(getProperty(SELECTOR_TIMEOUT, "50")));
private final ExecutorService workerPool;

private volatile boolean isDone;
private boolean alreadyFailed = false;

public ResponseStreamingCompletionHandler(final FilterChainContext ctx,
ClassLoader ctxClassLoader,
final HttpRequestPacket request,
final HttpResponse httpResponse, ResponseStatusCallback responseStatusCallback) {
final HttpResponse httpResponse,
ResponseStatusCallback responseStatusCallback,
ExecutorService workerPool) {
checkArgument((httpResponse.getEntity().isStreaming()), "HTTP response entity must be stream based");
this.ctx = ctx;
this.ctxClassLoader = ctxClassLoader;
Expand All @@ -79,6 +86,7 @@ public ResponseStreamingCompletionHandler(final FilterChainContext ctx,
bufferSize = calculateBufferSize(ctx, ctxClassLoader);
this.responseStatusCallback = responseStatusCallback;
this.startTimeNanos = nanoTime();
this.workerPool = workerPool;
}

/**
Expand Down Expand Up @@ -117,6 +125,25 @@ private int calculateBufferSize(FilterChainContext ctx, ClassLoader ctxClassLoad
}

public void start() throws IOException {
if (isSelectorTimeout()) {
markConnectionToDelegateWritesInConfiguredExecutor(true);
try {
workerPool.submit(() -> {
try {
start0();
} catch (Exception exception) {
responseStatusCallback.onErrorSendingResponse(exception);
}
});
} catch (RejectedExecutionException ree) {
start0();
}
} else {
start0();
}
}

private void start0() throws IOException {
Thread thread = null;
ClassLoader currentClassLoader = null;
ClassLoader newClassLoader = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,51 +6,53 @@
*/
package org.mule.service.http.impl.service.server.grizzly;

import static org.mule.runtime.http.api.HttpHeaders.Names.CONNECTION;
import static org.mule.service.http.impl.AllureConstants.HttpFeature.HTTP_SERVICE;
import static org.mule.service.http.impl.AllureConstants.HttpFeature.HttpStory.RESPONSES;
import static org.mule.tck.MuleTestUtils.testWithSystemProperty;

import static java.lang.Thread.currentThread;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.rules.ExpectedException.none;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mule.runtime.http.api.HttpHeaders.Names.CONNECTION;
import static org.mule.service.http.impl.AllureConstants.HttpFeature.HTTP_SERVICE;
import static org.mule.service.http.impl.AllureConstants.HttpFeature.HttpStory.RESPONSES;

import io.qameta.allure.Issue;
import org.glassfish.grizzly.http.ProcessingState;
import org.junit.Rule;
import org.junit.Test;

import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.util.MultiMap;
import org.mule.runtime.http.api.domain.entity.InputStreamHttpEntity;
import org.mule.runtime.http.api.domain.message.response.HttpResponse;

import org.glassfish.grizzly.Transport;
import org.junit.Before;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import io.qameta.allure.Feature;
import io.qameta.allure.Issue;
import io.qameta.allure.Story;
import org.junit.rules.ExpectedException;
import org.glassfish.grizzly.Transport;
import org.glassfish.grizzly.http.HttpContent;
import org.glassfish.grizzly.http.ProcessingState;
import org.junit.Before;
import org.junit.Test;

@Feature(HTTP_SERVICE)
@Story(RESPONSES)
public class ResponseStreamingCompletionHandlerTestCase extends BaseResponseCompletionHandlerTestCase {

private ResponseStreamingCompletionHandler handler;
private InputStream mockStream;

@Rule
public ExpectedException exception = none();
private final ExecutorService workerPool = mock(ExecutorService.class);

@Before
public void setUp() {
Expand All @@ -62,7 +64,8 @@ public void setUp() {
currentThread().getContextClassLoader(),
request,
responseMock,
callback);
callback,
workerPool);
}

@Override
Expand All @@ -80,7 +83,8 @@ public void keepAliveConnection() {
currentThread().getContextClassLoader(),
request,
responseMock,
callback);
callback,
workerPool);
assertThat(handler.getHttpResponsePacket().getHeader(CONNECTION), equalTo(KEEP_ALIVE));
}

Expand All @@ -94,7 +98,8 @@ public void cLoseConnection() {
currentThread().getContextClassLoader(),
request,
responseMock,
callback);
callback,
workerPool);
assertThat(getHandler().getHttpResponsePacket().getHeader(CONNECTION), equalTo(CLOSE));
}

Expand All @@ -106,7 +111,8 @@ public void completionHandlerFailsIfAReadOperationThrowsAMuleRuntimeException()
currentThread().getContextClassLoader(),
request,
responseMock,
callback));
callback,
workerPool));

when(mockStream.read(any(), anyInt(), anyInt())).thenThrow(new MuleRuntimeException(new NullPointerException()));
handler.sendInputStreamChunk();
Expand All @@ -122,11 +128,10 @@ public void IOExceptionIsRethrownIfCauseOfFailure() throws IOException {
currentThread().getContextClassLoader(),
request,
responseMock,
callback));

exception.expect(IOException.class);
callback,
workerPool));
when(mockStream.read(any(), anyInt(), anyInt())).thenThrow(new MuleRuntimeException(new IOException()));
handler.sendInputStreamChunk();
assertThrows(IOException.class, () -> handler.sendInputStreamChunk());
}

@Test
Expand All @@ -137,7 +142,8 @@ public void handlerDoesntThrowNPEWhenConnectionIsNull() {
currentThread().getContextClassLoader(),
request,
responseMock,
callback);
callback,
workerPool);
// When an unexpected error makes getConnection return null.
when(ctx.getConnection()).thenReturn(null);
// Then the failed() method is executed without throwing NPE.
Expand All @@ -152,7 +158,8 @@ public void failedMethodBehaviorIsExecutedOnlyOnceForTheSameHandler() throws IOE
currentThread().getContextClassLoader(),
request,
responseMock,
callback);
callback,
workerPool);

// When the failed() method is invoked several times
handler.failed(createExpectedException());
Expand All @@ -164,6 +171,41 @@ public void failedMethodBehaviorIsExecutedOnlyOnceForTheSameHandler() throws IOE
verify(callback, times(1)).onErrorSendingResponse(any(Exception.class));
}

@Test
@Issue("W-16892585")
public void whenTheTimeoutIsElapsedThenTheStartIsExecutedInTheWorkerScheduler() throws Exception {
responseMock = HttpResponse.builder().entity(new InputStreamHttpEntity(mockStream)).build();
testWithSystemProperty("mule.timeoutToUseSelectorWhileStreamingResponseMillis", "0", () -> {
handler = new ResponseStreamingCompletionHandler(ctx,
currentThread().getContextClassLoader(),
request,
responseMock,
callback,
workerPool);

handler.start();
verify(workerPool, times(1)).submit(any(Runnable.class));
});
}

@Test
@Issue("W-16892585")
public void whenTheTimeoutIsNotElapsedThenTheStartIsNotExecutedInTheWorkerScheduler() throws Exception {
responseMock = HttpResponse.builder().entity(new InputStreamHttpEntity(mockStream)).build();
testWithSystemProperty("mule.timeoutToUseSelectorWhileStreamingResponseMillis", "100000", () -> {
handler = new ResponseStreamingCompletionHandler(ctx,
currentThread().getContextClassLoader(),
request,
responseMock,
callback,
workerPool);

handler.start();
verify(workerPool, never()).submit(any(Runnable.class));
verify(ctx, times(1)).write(any(HttpContent.class), eq(handler));
});
}

private Exception createExpectedException() {
return new Exception("EXPECTED EXCEPTION");
}
Expand Down