Skip to content

Commit

Permalink
honour selector timeout in first read
Browse files Browse the repository at this point in the history
  • Loading branch information
eze210 committed Oct 4, 2024
1 parent 2cc6309 commit e79db84
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.nio.charset.Charset;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;

import javax.net.ssl.SSLSession;
Expand All @@ -60,14 +61,16 @@
public class GrizzlyRequestDispatcherFilter extends BaseFilter {

private final RequestHandlerProvider requestHandlerProvider;
private final ExecutorService workerPool;

private final byte[] SERVER_NOT_AVAILABLE_CONTENT = ("Server not available to handle this request, either not initialized yet "
+ "or it has been disposed.").getBytes(defaultCharset());

private ConcurrentMap<ServerAddress, AtomicInteger> activeRequests = new ConcurrentHashMap<>();

GrizzlyRequestDispatcherFilter(final RequestHandlerProvider requestHandlerProvider) {
GrizzlyRequestDispatcherFilter(final RequestHandlerProvider requestHandlerProvider, ExecutorService workerPool) {
this.requestHandlerProvider = requestHandlerProvider;
this.workerPool = workerPool;
}

@Override
Expand Down Expand Up @@ -140,7 +143,7 @@ public void responseReady(HttpResponse response, ResponseStatusCallback response

if (response.getEntity().isStreaming()) {
new ResponseStreamingCompletionHandler(ctx, requestHandler.getContextClassLoader(), request, response,
requestAdapterNotifyingResponseStatusCallback).start();
requestAdapterNotifyingResponseStatusCallback, workerPool).start();
} else {
new ResponseCompletionHandler(ctx, requestHandler.getContextClassLoader(), request, response,
requestAdapterNotifyingResponseStatusCallback).start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public GrizzlyServerManager(ExecutorService selectorPool,
this.httpListenerRegistry = httpListenerRegistry;
// TODO - MULE-14960: Remove system property once this can be configured through a file
this.serverTimeout = getInteger("mule.http.server.timeout", DEFAULT_SERVER_TIMEOUT_MILLIS);
requestHandlerFilter = new GrizzlyRequestDispatcherFilter(httpListenerRegistry);
requestHandlerFilter = new GrizzlyRequestDispatcherFilter(httpListenerRegistry, workerPool);
timeoutFilterDelegate = new GrizzlyAddressDelegateFilter<>();
sslFilterDelegate = new GrizzlyAddressDelegateFilter<>();
webSocketFilter = new GrizzlyAddressDelegateFilter<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,24 @@
*/
package org.mule.service.http.impl.service.server.grizzly;

import static com.google.common.base.Preconditions.checkArgument;
import static org.mule.runtime.api.util.DataUnit.KB;
import static org.mule.runtime.api.util.MuleSystemProperties.MULE_LOG_SEPARATION_DISABLED;
import static org.mule.runtime.api.util.MuleSystemProperties.SYSTEM_PROPERTY_PREFIX;
import static org.mule.runtime.core.api.util.ClassUtils.setContextClassLoader;
import static org.mule.runtime.core.api.util.StringUtils.isEmpty;
import static org.mule.runtime.http.api.HttpHeaders.Names.CONTENT_LENGTH;
import static org.mule.service.http.impl.service.server.grizzly.ExecutorPerServerAddressIOStrategy.DELEGATE_WRITES_IN_CONFIGURED_EXECUTOR;

import static java.lang.Integer.valueOf;
import static java.lang.Math.min;
import static java.lang.System.getProperty;
import static java.lang.System.nanoTime;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.lang.Thread.currentThread;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import static com.google.common.base.Preconditions.checkArgument;
import static org.glassfish.grizzly.http.HttpServerFilter.RESPONSE_COMPLETE_EVENT;
import static org.glassfish.grizzly.nio.transport.TCPNIOTransport.MAX_SEND_BUFFER_SIZE;
import static org.mule.runtime.api.util.DataUnit.KB;
import static org.mule.runtime.api.util.MuleSystemProperties.SYSTEM_PROPERTY_PREFIX;
import static org.mule.runtime.api.util.MuleSystemProperties.MULE_LOG_SEPARATION_DISABLED;
import static org.mule.runtime.core.api.util.ClassUtils.setContextClassLoader;
import static org.mule.runtime.core.api.util.StringUtils.isEmpty;
import static org.mule.runtime.http.api.HttpHeaders.Names.CONTENT_LENGTH;
import static org.mule.service.http.impl.service.server.grizzly.ExecutorPerServerAddressIOStrategy.DELEGATE_WRITES_IN_CONFIGURED_EXECUTOR;
import static org.slf4j.LoggerFactory.getLogger;

import org.mule.runtime.api.connection.SourceRemoteConnectionException;
Expand All @@ -33,6 +35,8 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;

import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.Connection;
Expand Down Expand Up @@ -62,14 +66,17 @@ public class ResponseStreamingCompletionHandler extends BaseResponseCompletionHa

private static final String SELECTOR_TIMEOUT = SYSTEM_PROPERTY_PREFIX + "timeoutToUseSelectorWhileStreamingResponseMillis";
private final long selectorTimeoutNanos = MILLISECONDS.toNanos(Long.valueOf(getProperty(SELECTOR_TIMEOUT, "50")));
private final ExecutorService workerPool;

private volatile boolean isDone;
private boolean alreadyFailed = false;

public ResponseStreamingCompletionHandler(final FilterChainContext ctx,
ClassLoader ctxClassLoader,
final HttpRequestPacket request,
final HttpResponse httpResponse, ResponseStatusCallback responseStatusCallback) {
final HttpResponse httpResponse,
ResponseStatusCallback responseStatusCallback,
ExecutorService workerPool) {
checkArgument((httpResponse.getEntity().isStreaming()), "HTTP response entity must be stream based");
this.ctx = ctx;
this.ctxClassLoader = ctxClassLoader;
Expand All @@ -79,6 +86,7 @@ public ResponseStreamingCompletionHandler(final FilterChainContext ctx,
bufferSize = calculateBufferSize(ctx, ctxClassLoader);
this.responseStatusCallback = responseStatusCallback;
this.startTimeNanos = nanoTime();
this.workerPool = workerPool;
}

/**
Expand Down Expand Up @@ -117,6 +125,25 @@ private int calculateBufferSize(FilterChainContext ctx, ClassLoader ctxClassLoad
}

public void start() throws IOException {
if (isSelectorTimeout()) {
markConnectionToDelegateWritesInConfiguredExecutor(true);
try {
workerPool.submit(() -> {
try {
start0();
} catch (Exception exception) {
responseStatusCallback.onErrorSendingResponse(exception);
}
});
} catch (RejectedExecutionException ree) {
start0();
}
} else {
start0();
}
}

private void start0() throws IOException {
Thread thread = null;
ClassLoader currentClassLoader = null;
ClassLoader newClassLoader = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,51 +6,53 @@
*/
package org.mule.service.http.impl.service.server.grizzly;

import static org.mule.runtime.http.api.HttpHeaders.Names.CONNECTION;
import static org.mule.service.http.impl.AllureConstants.HttpFeature.HTTP_SERVICE;
import static org.mule.service.http.impl.AllureConstants.HttpFeature.HttpStory.RESPONSES;
import static org.mule.tck.MuleTestUtils.testWithSystemProperty;

import static java.lang.Thread.currentThread;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.rules.ExpectedException.none;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mule.runtime.http.api.HttpHeaders.Names.CONNECTION;
import static org.mule.service.http.impl.AllureConstants.HttpFeature.HTTP_SERVICE;
import static org.mule.service.http.impl.AllureConstants.HttpFeature.HttpStory.RESPONSES;

import io.qameta.allure.Issue;
import org.glassfish.grizzly.http.ProcessingState;
import org.junit.Rule;
import org.junit.Test;

import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.util.MultiMap;
import org.mule.runtime.http.api.domain.entity.InputStreamHttpEntity;
import org.mule.runtime.http.api.domain.message.response.HttpResponse;

import org.glassfish.grizzly.Transport;
import org.junit.Before;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import io.qameta.allure.Feature;
import io.qameta.allure.Issue;
import io.qameta.allure.Story;
import org.junit.rules.ExpectedException;
import org.glassfish.grizzly.Transport;
import org.glassfish.grizzly.http.HttpContent;
import org.glassfish.grizzly.http.ProcessingState;
import org.junit.Before;
import org.junit.Test;

@Feature(HTTP_SERVICE)
@Story(RESPONSES)
public class ResponseStreamingCompletionHandlerTestCase extends BaseResponseCompletionHandlerTestCase {

private ResponseStreamingCompletionHandler handler;
private InputStream mockStream;

@Rule
public ExpectedException exception = none();
private final ExecutorService workerPool = mock(ExecutorService.class);

@Before
public void setUp() {
Expand All @@ -62,7 +64,8 @@ public void setUp() {
currentThread().getContextClassLoader(),
request,
responseMock,
callback);
callback,
workerPool);
}

@Override
Expand All @@ -80,7 +83,8 @@ public void keepAliveConnection() {
currentThread().getContextClassLoader(),
request,
responseMock,
callback);
callback,
workerPool);
assertThat(handler.getHttpResponsePacket().getHeader(CONNECTION), equalTo(KEEP_ALIVE));
}

Expand All @@ -94,7 +98,8 @@ public void cLoseConnection() {
currentThread().getContextClassLoader(),
request,
responseMock,
callback);
callback,
workerPool);
assertThat(getHandler().getHttpResponsePacket().getHeader(CONNECTION), equalTo(CLOSE));
}

Expand All @@ -106,7 +111,8 @@ public void completionHandlerFailsIfAReadOperationThrowsAMuleRuntimeException()
currentThread().getContextClassLoader(),
request,
responseMock,
callback));
callback,
workerPool));

when(mockStream.read(any(), anyInt(), anyInt())).thenThrow(new MuleRuntimeException(new NullPointerException()));
handler.sendInputStreamChunk();
Expand All @@ -122,11 +128,10 @@ public void IOExceptionIsRethrownIfCauseOfFailure() throws IOException {
currentThread().getContextClassLoader(),
request,
responseMock,
callback));

exception.expect(IOException.class);
callback,
workerPool));
when(mockStream.read(any(), anyInt(), anyInt())).thenThrow(new MuleRuntimeException(new IOException()));
handler.sendInputStreamChunk();
assertThrows(IOException.class, () -> handler.sendInputStreamChunk());
}

@Test
Expand All @@ -137,7 +142,8 @@ public void handlerDoesntThrowNPEWhenConnectionIsNull() {
currentThread().getContextClassLoader(),
request,
responseMock,
callback);
callback,
workerPool);
// When an unexpected error makes getConnection return null.
when(ctx.getConnection()).thenReturn(null);
// Then the failed() method is executed without throwing NPE.
Expand All @@ -152,7 +158,8 @@ public void failedMethodBehaviorIsExecutedOnlyOnceForTheSameHandler() throws IOE
currentThread().getContextClassLoader(),
request,
responseMock,
callback);
callback,
workerPool);

// When the failed() method is invoked several times
handler.failed(createExpectedException());
Expand All @@ -164,6 +171,39 @@ public void failedMethodBehaviorIsExecutedOnlyOnceForTheSameHandler() throws IOE
verify(callback, times(1)).onErrorSendingResponse(any(Exception.class));
}

@Test
public void whenTheTimeoutIsElapsedThenTheStartIsExecutedInTheWorkerScheduler() throws Exception {
responseMock = HttpResponse.builder().entity(new InputStreamHttpEntity(mockStream)).build();
testWithSystemProperty("mule.timeoutToUseSelectorWhileStreamingResponseMillis", "0", () -> {
handler = new ResponseStreamingCompletionHandler(ctx,
currentThread().getContextClassLoader(),
request,
responseMock,
callback,
workerPool);

handler.start();
verify(workerPool, times(1)).submit(any(Runnable.class));
});
}

@Test
public void whenTheTimeoutIsNotElapsedThenTheStartIsNotExecutedInTheWorkerScheduler() throws Exception {
responseMock = HttpResponse.builder().entity(new InputStreamHttpEntity(mockStream)).build();
testWithSystemProperty("mule.timeoutToUseSelectorWhileStreamingResponseMillis", "100000", () -> {
handler = new ResponseStreamingCompletionHandler(ctx,
currentThread().getContextClassLoader(),
request,
responseMock,
callback,
workerPool);

handler.start();
verify(workerPool, never()).submit(any(Runnable.class));
verify(ctx, times(1)).write(any(HttpContent.class), eq(handler));
});
}

private Exception createExpectedException() {
return new Exception("EXPECTED EXCEPTION");
}
Expand Down

0 comments on commit e79db84

Please sign in to comment.