From bed562922f1d97cc83a752d177e85d5462731627 Mon Sep 17 00:00:00 2001 From: malakaganga Date: Fri, 16 Jun 2023 10:51:46 +0530 Subject: [PATCH] Fix SO timeout log issue when the worker pool is exhausted. Fix SO timeout log issue when the worker pool is exhausted. Change SO timeout log structure. Define new logs to identify secondary worker pool exhausting scenarios (by defining new max queueing time for that as well + at SO timeout) Change the implementation of max queueing time since it's error-prone with a new thread pool. Improve the logs to indicate secondary passthrough pool is exhausted when all threads are busy. Fixes: https://github.com/wso2/api-manager/issues/1923 Add discard logic to clientWorker when secondary pool is not available Move thread state to the relevant worker thread itself Done a fix to move the thread state to the relevant worker thread itself rather than keeping it in the connection. Fixes: https://github.com/wso2/api-manager/issues/1923 Related to: https://github.com/wso2-support/wso2-synapse/pull/2135 --- .../transport/passthru/ClientWorker.java | 32 ++++- .../passthru/MessageDiscardWorker.java | 71 +++++----- .../passthru/PassThroughConstants.java | 12 +- .../transport/passthru/ServerWorker.java | 22 +++- .../transport/passthru/SourceHandler.java | 124 ++++++++++++------ .../transport/passthru/TargetHandler.java | 108 ++++++++------- .../transport/passthru/WorkerState.java | 31 +++++ .../config/PassThroughConfigPNames.java | 12 ++ .../config/PassThroughConfiguration.java | 25 ++++ .../connections/SourceConnections.java | 3 +- .../connections/TargetConnections.java | 3 +- .../util/PassThroughTransportUtils.java | 33 +++++ 12 files changed, 332 insertions(+), 144 deletions(-) create mode 100644 modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/WorkerState.java diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/ClientWorker.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/ClientWorker.java index d8346ae0bd..4c0a841ae5 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/ClientWorker.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/ClientWorker.java @@ -67,14 +67,20 @@ public class ClientWorker implements Runnable { /** the axis2 message context of the request */ private MessageContext requestMessageContext; + private Long queuedTime = null; + private PassThroughConfiguration conf = PassThroughConfiguration.getInstance(); + private WorkerState state; + public ClientWorker(TargetConfiguration targetConfiguration, MessageContext outMsgCtx, TargetResponse response) { this(targetConfiguration, outMsgCtx, response, Collections.emptyList()); } public ClientWorker(TargetConfiguration targetConfiguration, MessageContext outMsgCtx, TargetResponse response, List allowedResponseProperties) { + this.state = WorkerState.CREATED; + this.queuedTime = System.currentTimeMillis(); this.targetConfiguration = targetConfiguration; this.response = response; this.expectEntityBody = response.isExpectResponseBody(); @@ -240,15 +246,12 @@ public void run() { return; } // Mark the start of the request at the beginning of the worker thread - response.getConnection().getContext().setAttribute(PassThroughConstants.CLIENT_WORKER_THREAD_STATUS, - PassThroughConstants.THREAD_STATUS_RUNNING); - Object queuedTime = - response.getConnection().getContext().getAttribute(PassThroughConstants.CLIENT_WORKER_START_TIME); + setWorkerState(WorkerState.RUNNING); String expectedMaxQueueingTime = conf.getExpectedMaxQueueingTime(); if (queuedTime != null && expectedMaxQueueingTime != null) { Long expectedMaxQueueingTimeInMillis = Long.parseLong(expectedMaxQueueingTime); - Long clientWorkerQueuedTime = System.currentTimeMillis() - (Long) queuedTime; + Long clientWorkerQueuedTime = System.currentTimeMillis() - queuedTime; if (clientWorkerQueuedTime >= expectedMaxQueueingTimeInMillis) { log.warn("Client worker thread queued time exceeds the expected max queueing time. Expected max " + "queueing time : " + expectedMaxQueueingTimeInMillis + "ms. Actual queued time : " + @@ -262,6 +265,16 @@ public void run() { ((NHttpServerConnection) responseMsgCtx.getProperty(PassThroughConstants.PASS_THROUGH_SOURCE_CONNECTION)). getContext().setAttribute(PassThroughConstants.CLIENT_WORKER_START_TIME, System.currentTimeMillis()); } + + if (response.isForceShutdownConnectionOnComplete() && !conf.isConsumeAndDiscardBySecondaryWorkerPool() + && conf.isConsumeAndDiscard()) { + // If an error has happened in the request processing, consumes the data in pipe completely and discard it + try { + RelayUtils.discardRequestMessage(requestMessageContext); + } catch (AxisFault af) { + log.error("Fault discarding request message", af); + } + } try { if (expectEntityBody) { String cType = response.getHeader(HTTP.CONTENT_TYPE); @@ -346,6 +359,7 @@ public void run() { log.error("Fault creating response SOAP envelope", af); } finally { cleanup(); + setWorkerState(WorkerState.FINISHED); } } @@ -417,6 +431,14 @@ private String inferContentType() { return PassThroughConstants.DEFAULT_CONTENT_TYPE; } + private void setWorkerState(WorkerState workerState) { + this.state = workerState; + } + + public WorkerState getWorkerState() { + return this.state; + } + /** * Perform cleanup of ClientWorker */ diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/MessageDiscardWorker.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/MessageDiscardWorker.java index a3381c61a8..720842ea24 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/MessageDiscardWorker.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/MessageDiscardWorker.java @@ -22,13 +22,15 @@ import org.apache.commons.logging.LogFactory; import org.apache.http.nio.NHttpClientConnection; import org.apache.http.nio.NHttpServerConnection; +import org.apache.synapse.commons.CorrelationConstants; +import org.apache.synapse.transport.passthru.config.PassThroughConfiguration; import org.apache.synapse.transport.passthru.config.TargetConfiguration; +import org.apache.synapse.transport.passthru.util.PassThroughTransportUtils; import org.apache.synapse.transport.passthru.util.RelayUtils; public class MessageDiscardWorker implements Runnable { private Log log = LogFactory.getLog(MessageDiscardWorker.class); - private ClientWorker clientWorker = null; TargetConfiguration targetConfiguration = null; @@ -38,17 +40,38 @@ public class MessageDiscardWorker implements Runnable { NHttpClientConnection conn = null; + private Long queuedTime = null; + + private PassThroughConfiguration conf = PassThroughConfiguration.getInstance(); + + private WorkerState state; + public MessageDiscardWorker(MessageContext requestMsgContext, TargetResponse response, - TargetConfiguration targetConfiguration, ClientWorker clientWorker, NHttpClientConnection conn) { + TargetConfiguration targetConfiguration, NHttpClientConnection conn) { + this.state = WorkerState.CREATED; this.response = response; this.requestMessageContext = requestMsgContext; this.targetConfiguration = targetConfiguration; - this.clientWorker = clientWorker; this.conn = conn; + this.queuedTime = System.currentTimeMillis(); } public void run() { + // Mark the start of the request message discard worker at the beginning of the worker thread + setWorkerState(WorkerState.RUNNING); + Long expectedMaxQueueingTime = conf.getExpectedMaxQueueingTimeForMessageDiscardWorker(); + if (queuedTime != null && expectedMaxQueueingTime != null) { + Long messageDiscardWorkerQueuedTime = System.currentTimeMillis() - queuedTime; + if (messageDiscardWorkerQueuedTime >= expectedMaxQueueingTime) { + log.warn("Message discard worker queued time exceeds the expected max queueing time. Expected " + + "max queueing time : " + expectedMaxQueueingTime + "ms. Actual queued time : " + + messageDiscardWorkerQueuedTime + "ms"+ ", CORRELATION_ID : " + + requestMessageContext.getProperty(CorrelationConstants.CORRELATION_ID)); + } + + } + // If an error has happened in the request processing, consumes the data in pipe completely and discard it try { RelayUtils.discardRequestMessage(requestMessageContext); @@ -56,6 +79,11 @@ public void run() { log.error("Fault discarding request message", af); } + // Mark the end of the request message discard worker at the end of the worker thread + setWorkerState(WorkerState.FINISHED); + ClientWorker clientWorker = new ClientWorker(targetConfiguration, requestMessageContext, response); + conn.getContext().setAttribute(PassThroughConstants.CLIENT_WORKER_REFERENCE + , clientWorker); targetConfiguration.getWorkerPool().execute(clientWorker); targetConfiguration.getMetrics().incrementMessagesReceived(); @@ -63,35 +91,16 @@ public void run() { NHttpServerConnection sourceConn = (NHttpServerConnection) requestMessageContext.getProperty( PassThroughConstants.PASS_THROUGH_SOURCE_CONNECTION); if (sourceConn != null) { - sourceConn.getContext().setAttribute(PassThroughConstants.RES_HEADER_ARRIVAL_TIME, - conn.getContext() - .getAttribute(PassThroughConstants.RES_HEADER_ARRIVAL_TIME) - ); - conn.getContext().removeAttribute(PassThroughConstants.RES_HEADER_ARRIVAL_TIME); - - sourceConn.getContext().setAttribute(PassThroughConstants.REQ_DEPARTURE_TIME, - conn.getContext() - .getAttribute(PassThroughConstants.REQ_DEPARTURE_TIME) - ); - conn.getContext().removeAttribute(PassThroughConstants.REQ_DEPARTURE_TIME); - sourceConn.getContext().setAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_START_TIME, - conn.getContext() - .getAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_START_TIME) - ); - - conn.getContext().removeAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_START_TIME); - sourceConn.getContext().setAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_END_TIME, - conn.getContext() - .getAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_END_TIME) - ); - conn.getContext().removeAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_END_TIME); - sourceConn.getContext().setAttribute(PassThroughConstants.RES_FROM_BACKEND_READ_START_TIME, - conn.getContext() - .getAttribute(PassThroughConstants.RES_FROM_BACKEND_READ_START_TIME) - ); - conn.getContext().removeAttribute(PassThroughConstants.RES_FROM_BACKEND_READ_START_TIME); - + PassThroughTransportUtils.setSourceConnectionContextAttributes(sourceConn, conn); } } + + private void setWorkerState(WorkerState workerState) { + this.state = workerState; + } + + public WorkerState getWorkerState() { + return this.state; + } } diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/PassThroughConstants.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/PassThroughConstants.java index 483a21aa30..6497fa51fd 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/PassThroughConstants.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/PassThroughConstants.java @@ -121,7 +121,7 @@ public class PassThroughConstants { public static final String CLONE_PASS_THROUGH_PIPE_REQUEST = "clone_pass-through.pipe_connected"; public static final String CONNECTION_LIMIT_EXCEEDS = "CONNECTION_LIMIT_EXCEEDS"; - + /** * Name of the .mar file */ @@ -162,14 +162,10 @@ public class PassThroughConstants { public static final String SERVER_WORKER_INIT_TIME = "SERVER_WORKER_INIT_TIME"; - public static final String SERVER_WORKER_THREAD_STATUS = "SERVER_WORKER_THREAD_STATUS"; - - public static final String CLIENT_WORKER_THREAD_STATUS = "CLIENT_WORKER_THREAD_STATUS"; - - public static final String SERVER_WORKER_SIDE_QUEUED_TIME = "SERVER_WORKER_SIDE_QUEUED_TIME"; - - public static final String CLIENT_WORKER_SIDE_QUEUED_TIME = "CLIENT_WORKER_SIDE_QUEUED_TIME"; + public static final String SERVER_WORKER_REFERENCE = "SERVER_WORKER_REFERENCE"; + public static final String CLIENT_WORKER_REFERENCE = "CLIENT_WORKER_REFERENCE"; + public static final String MESSAGE_DISCARD_WORKER_REFERENCE = "MESSAGE_DISCARD_WORKER_REFERENCE"; public static final String THREAD_STATUS_RUNNING = "RUNNING"; public static final String SERVER_WORKER_START_TIME = "SERVER_WORKER_START_TIME"; diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/ServerWorker.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/ServerWorker.java index ab65b33cf2..ff5e466fe4 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/ServerWorker.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/ServerWorker.java @@ -110,9 +110,13 @@ public class ServerWorker implements Runnable { private PassThroughConfiguration conf = PassThroughConfiguration.getInstance(); private OutputStream os; //only used for WSDL requests.. - + + private Long queuedTime = null; +private WorkerState state; + public ServerWorker(final SourceRequest request, final SourceConfiguration sourceConfiguration,final OutputStream os) { + this.state = WorkerState.CREATED; this.request = request; this.sourceConfiguration = sourceConfiguration; @@ -134,6 +138,7 @@ public ServerWorker(final SourceRequest request, request.getConnection().getContext().setAttribute(NhttpConstants.SERVER_WORKER_INIT_TIME, System.currentTimeMillis()); request.getConnection().getContext().setAttribute(PassThroughConstants.REQUEST_MESSAGE_CONTEXT, msgContext); + queuedTime = System.currentTimeMillis(); } public ServerWorker(final SourceRequest request, @@ -148,15 +153,12 @@ public void run() { try { // Mark the start of the request at the beginning of the worker thread - request.getConnection().getContext().setAttribute(PassThroughConstants.SERVER_WORKER_THREAD_STATUS, - PassThroughConstants.THREAD_STATUS_RUNNING); - Object queuedTime = - request.getConnection().getContext().getAttribute(PassThroughConstants.SERVER_WORKER_START_TIME); + setWorkerState(WorkerState.RUNNING); String expectedMaxQueueingTime = conf.getExpectedMaxQueueingTime(); if (queuedTime != null && expectedMaxQueueingTime != null) { Long expectedMaxQueueingTimeInMillis = Long.parseLong(expectedMaxQueueingTime); - Long serverWorkerQueuedTime = System.currentTimeMillis() - (Long) queuedTime; + Long serverWorkerQueuedTime = System.currentTimeMillis() - queuedTime; if (serverWorkerQueuedTime >= expectedMaxQueueingTimeInMillis) { log.warn("Server worker thread queued time exceeds the expected max queueing time. Expected max " + "queueing time : " + expectedMaxQueueingTimeInMillis + "ms. Actual queued time : " + @@ -637,6 +639,14 @@ public int compare(String o1, String o2) { return msgContext; } + private void setWorkerState(WorkerState workerState) { + this.state = workerState; + } + + public WorkerState getWorkerState() { + return this.state; + } + private void handleException(String msg, Exception e) { if (e == null) { log.error(msg); diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java index 009dd13f8c..335ccb17de 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/SourceHandler.java @@ -195,19 +195,15 @@ public void requestReceived(NHttpServerConnection conn) { Object correlationId = conn.getContext().getAttribute(CorrelationConstants.CORRELATION_ID); if (correlationId != null) { WorkerPool workerPool = sourceConfiguration.getWorkerPool(); - workerPool.execute(new ServerWorker(request, sourceConfiguration, os, - System.currentTimeMillis(), correlationId.toString())); - if (workerPool.getActiveCount() >= conf.getWorkerPoolCoreSize()) { - conn.getContext().setAttribute(PassThroughConstants.SERVER_WORKER_SIDE_QUEUED_TIME, - System.currentTimeMillis()); - } + ServerWorker serverWorker = new ServerWorker(request, sourceConfiguration, os, + System.currentTimeMillis(), correlationId.toString()); + conn.getContext().setAttribute(PassThroughConstants.SERVER_WORKER_REFERENCE, serverWorker); + workerPool.execute(serverWorker); } else { WorkerPool workerPool = sourceConfiguration.getWorkerPool(); - workerPool.execute(new ServerWorker(request, sourceConfiguration, os)); - if (workerPool.getActiveCount() >= conf.getWorkerPoolCoreSize()) { - conn.getContext().setAttribute(PassThroughConstants.SERVER_WORKER_SIDE_QUEUED_TIME, - System.currentTimeMillis()); - } + ServerWorker serverWorker = new ServerWorker(request, sourceConfiguration, os); + conn.getContext().setAttribute(PassThroughConstants.SERVER_WORKER_REFERENCE, serverWorker); + workerPool.execute(serverWorker); } //increasing the input request metric metrics.requestReceived(); @@ -682,13 +678,10 @@ public void timeout(NHttpServerConnection conn) { boolean isTimeoutOccurred = false; ProtocolState state = SourceContext.getState(conn); Map logDetails = getLoggingInfo(conn, state); - - if (!PassThroughConstants.THREAD_STATUS_RUNNING.equals(conn.getContext().getAttribute - (PassThroughConstants.SERVER_WORKER_THREAD_STATUS))) { - log.warn("Source Handler Socket Timeout occurred while the worker pool exhausted, " + - "INTERNAL_STATE = " + state + ", CORRELATION_ID = " - + conn.getContext().getAttribute(CorrelationConstants.CORRELATION_ID)); - } + Object serverWorker = conn.getContext().getAttribute( + PassThroughConstants.SERVER_WORKER_REFERENCE); + Object messageDiscardWorker = conn.getContext().getAttribute( + PassThroughConstants.MESSAGE_DISCARD_WORKER_REFERENCE); if (state == ProtocolState.REQUEST_READY || state == ProtocolState.RESPONSE_DONE) { if (log.isDebugEnabled()) { log.debug(conn + ": Keep-Alive connection was time out: "); @@ -699,14 +692,21 @@ public void timeout(NHttpServerConnection conn) { informReaderError(conn); isTimeoutOccurred = true; - log.warn("STATE_DESCRIPTION = Socket Timeout occurred after reading the request headers but Server is " - + "still reading the request body, INTERNAL_STATE = " + state + ", DIRECTION = " + logDetails - .get("direction") + ", " - + "CAUSE_OF_ERROR = Connection between the client and the EI timeouts, HTTP_URL = " + logDetails - .get("url") + ", " + "HTTP_METHOD = " + logDetails.get("method") + ", SOCKET_TIMEOUT = " + conn - .getSocketTimeout() + ", CLIENT_ADDRESS = " + getClientConnectionInfo(conn) + ", CONNECTION " + conn - + " Correlation ID : " + conn.getContext().getAttribute( - CorrelationConstants.CORRELATION_ID).toString()); + String logMessage = "STATE_DESCRIPTION = Socket Timeout occurred after reading the request " + + "headers but Server is " + + "still reading the request body, INTERNAL_STATE = " + state + ", DIRECTION = " + + logDetails.get("direction") + ", " + + "CAUSE_OF_ERROR = Connection between the client and the EI timeouts, HTTP_URL = " + + logDetails.get("url") + ", " + "HTTP_METHOD = " + logDetails.get("method") + ", SOCKET_TIMEOUT = " + + conn.getSocketTimeout() + ", CLIENT_ADDRESS = " + getClientConnectionInfo(conn) + + ", CORRELATION_ID = " + conn.getContext().getAttribute(CorrelationConstants.CORRELATION_ID) + + ", CONNECTION = " + conn; + if (isPrimaryWorkerPoolExhausted(serverWorker)) { + log.warn(logMessage + ", Could not get a PassThroughMessageProcessor thread to process the " + + "request message. The primary worker pool is exhausted."); + } else { + log.warn(logMessage + secondaryWorkerPoolExhaustedErrorMessage(messageDiscardWorker)); + } if (PassThroughCorrelationConfigDataHolder.isEnable()) { logHttpRequestErrorInCorrelationLog(conn, "TIMEOUT in " + state.name()); } @@ -714,14 +714,22 @@ public void timeout(NHttpServerConnection conn) { informWriterError(conn); isTimeoutOccurred = true; metrics.timeoutOccured(); - log.warn("STATE_DESCRIPTION = Socket Timeout occurred after server writing the response headers to the " + String logMessage = "STATE_DESCRIPTION = Socket Timeout occurred after server writing the response" + + " headers to the " + "client" + "but Server is still writing the response body, INTERNAL_STATE = " + state + ", DIRECTION = " + logDetails.get("direction") + ", " - + "CAUSE_OF_ERROR = Connection between the client and the EI timeouts, HTTP_URL = " + logDetails - .get("url") + ", " + "HTTP_METHOD = " + logDetails.get("method") + ", SOCKET_TIMEOUT = " + conn - .getSocketTimeout() + ", CLIENT_ADDRESS = " + getClientConnectionInfo(conn) + ", CONNECTION " + conn - + " Correlation ID : " + conn.getContext().getAttribute( - CorrelationConstants.CORRELATION_ID).toString()); + + "CAUSE_OF_ERROR = Connection between the client and the EI timeouts, HTTP_URL = " + + logDetails.get("url") + ", " + "HTTP_METHOD = " + logDetails.get("method") + ", SOCKET_TIMEOUT = " + + conn.getSocketTimeout() + ", CLIENT_ADDRESS = " + getClientConnectionInfo(conn) + + ", CORRELATION_ID = " + conn.getContext().getAttribute(CorrelationConstants.CORRELATION_ID) + + ", CONNECTION = " + conn; + + if (isPrimaryWorkerPoolExhausted(serverWorker)) { + log.warn(logMessage + ", Could not get a PassThroughMessageProcessor thread to process the " + + "request message. The primary worker pool is exhausted."); + } else { + log.warn(logMessage + secondaryWorkerPoolExhaustedErrorMessage(messageDiscardWorker)); + } if (PassThroughCorrelationConfigDataHolder.isEnable()) { logHttpRequestErrorInCorrelationLog(conn, "TIMEOUT in " + state.name()); } @@ -729,15 +737,22 @@ public void timeout(NHttpServerConnection conn) { informWriterError(conn); isTimeoutOccurred = true; metrics.timeoutOccured(); - log.warn( - "STATE_DESCRIPTION = Socket Timeout occurred after accepting the request headers and the request " - + "body, INTERNAL_STATE = " - + state + ", DIRECTION = " + logDetails.get("direction") + ", " - + "CAUSE_OF_ERROR = Connection between the client and the WSO2 server timeouts, HTTP_URL = " - + logDetails.get("url") + ", " + "HTTP_METHOD = " + logDetails.get("method") - + ", SOCKET_TIMEOUT = " + conn.getSocketTimeout() + ", CLIENT_ADDRESS = " - + getClientConnectionInfo(conn) + ", CONNECTION " + conn + " Correlation ID : " - + conn.getContext().getAttribute(CorrelationConstants.CORRELATION_ID).toString()); + String logMessage = "STATE_DESCRIPTION = Socket Timeout occurred after accepting the request" + + " headers and the request " + + "body, INTERNAL_STATE = " + + state + ", DIRECTION = " + logDetails.get("direction") + ", " + + "CAUSE_OF_ERROR = Connection between the client and the WSO2 server timeouts, HTTP_URL = " + + logDetails.get("url") + ", " + "HTTP_METHOD = " + logDetails.get("method") + + ", SOCKET_TIMEOUT = " + conn.getSocketTimeout() + ", CLIENT_ADDRESS = " + + getClientConnectionInfo(conn) + + ", CORRELATION_ID = " + conn.getContext().getAttribute(CorrelationConstants.CORRELATION_ID) + + ", CONNECTION = " + conn; + if (isPrimaryWorkerPoolExhausted(serverWorker)) { + log.warn(logMessage + ", Could not get a PassThroughMessageProcessor thread to process the " + + "request message. The primary worker pool is exhausted."); + } else { + log.warn(logMessage + secondaryWorkerPoolExhaustedErrorMessage(messageDiscardWorker)); + } if (PassThroughCorrelationConfigDataHolder.isEnable()) { logHttpRequestErrorInCorrelationLog(conn, "TIMEOUT in " + state.name()); } @@ -751,6 +766,31 @@ public void timeout(NHttpServerConnection conn) { } } + private String secondaryWorkerPoolExhaustedErrorMessage(Object messageDiscardWorker) { + String workerPoolExhaustedMessage = ""; + if (messageDiscardWorker == null) { + return workerPoolExhaustedMessage; + } + MessageDiscardWorker msgDiscardWorker = (MessageDiscardWorker) messageDiscardWorker; + if (WorkerState.CREATED == msgDiscardWorker.getWorkerState()) { + workerPoolExhaustedMessage = ", Could not get a secondary worker thread to discard the request content. " + + "The secondary worker pool is exhausted."; + return workerPoolExhaustedMessage; + } else if (WorkerState.RUNNING == msgDiscardWorker.getWorkerState()) { + workerPoolExhaustedMessage = ", The secondary worker thread which was discarding the request content" + + " has been released."; + return workerPoolExhaustedMessage; + } + return workerPoolExhaustedMessage; + } + + private boolean isPrimaryWorkerPoolExhausted(Object serverWorker) { + if (serverWorker == null) { + return false; + } + return WorkerState.RUNNING != ((ServerWorker)serverWorker).getWorkerState(); + } + public void closed(NHttpServerConnection conn) { ProtocolState state = SourceContext.getState(conn); Map logDetails = getLoggingInfo(conn, state); @@ -881,7 +921,7 @@ public void exception(NHttpServerConnection conn, Exception ex) { sourceConfiguration.getHttpProcessor().process(response, httpContext); - conn.submitResponse(response); + conn.submitResponse(response); SourceContext.updateState(conn, ProtocolState.CLOSED); informWriterError(conn); conn.close(); diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java index 6f4a4f041e..1480e49f44 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java @@ -51,6 +51,8 @@ import org.apache.synapse.transport.passthru.config.TargetConfiguration; import org.apache.synapse.transport.passthru.connections.HostConnections; import org.apache.synapse.transport.passthru.jmx.PassThroughTransportMetricsCollector; +import org.apache.synapse.transport.passthru.util.PassThroughTransportUtils; +import org.apache.synapse.transport.passthru.util.RelayUtils; import java.io.IOException; import java.net.SocketAddress; @@ -515,56 +517,33 @@ public void responseReceived(NHttpClientConnection conn) { if (statusCode == HttpStatus.SC_ACCEPTED && handle202(requestMsgContext)) { return; } - if (targetResponse.isForceShutdownConnectionOnComplete() && conf.isConsumeAndDiscard()) { - ClientWorker clientWorker = new ClientWorker(targetConfiguration, requestMsgContext, targetResponse, - allowedResponseProperties); - targetConfiguration.getSecondaryWorkerPool().execute(new MessageDiscardWorker(requestMsgContext, - targetResponse, targetConfiguration, clientWorker, conn)); + if (targetResponse.isForceShutdownConnectionOnComplete() && conf.isConsumeAndDiscardBySecondaryWorkerPool() + && conf.isConsumeAndDiscard()) { + NHttpServerConnection sourceConn = (NHttpServerConnection) requestMsgContext.getProperty( + PassThroughConstants.PASS_THROUGH_SOURCE_CONNECTION); + MessageDiscardWorker messageDiscardWorker = new MessageDiscardWorker(requestMsgContext, + targetResponse, targetConfiguration, conn); + conn.getContext().setAttribute(PassThroughConstants.MESSAGE_DISCARD_WORKER_REFERENCE + , messageDiscardWorker); + if (sourceConn != null) { + sourceConn.getContext().setAttribute(PassThroughConstants.MESSAGE_DISCARD_WORKER_REFERENCE + , messageDiscardWorker); + } + targetConfiguration.getSecondaryWorkerPool().execute(messageDiscardWorker); return; } WorkerPool workerPool = targetConfiguration.getWorkerPool(); - workerPool.execute( - new ClientWorker(targetConfiguration, requestMsgContext, targetResponse, - allowedResponseProperties)); - if (workerPool.getActiveCount() >= conf.getWorkerPoolCoreSize()) { - conn.getContext().setAttribute(PassThroughConstants.CLIENT_WORKER_SIDE_QUEUED_TIME, - System.currentTimeMillis()); - } + ClientWorker clientWorker = new ClientWorker(targetConfiguration, requestMsgContext, targetResponse); + conn.getContext().setAttribute(PassThroughConstants.CLIENT_WORKER_REFERENCE, clientWorker); + workerPool.execute(clientWorker); targetConfiguration.getMetrics().incrementMessagesReceived(); NHttpServerConnection sourceConn = (NHttpServerConnection) requestMsgContext.getProperty( PassThroughConstants.PASS_THROUGH_SOURCE_CONNECTION); if (sourceConn != null) { - sourceConn.getContext().setAttribute(PassThroughConstants.RES_HEADER_ARRIVAL_TIME, - conn.getContext() - .getAttribute(PassThroughConstants.RES_HEADER_ARRIVAL_TIME) - ); - conn.getContext().removeAttribute(PassThroughConstants.RES_HEADER_ARRIVAL_TIME); - - sourceConn.getContext().setAttribute(PassThroughConstants.REQ_DEPARTURE_TIME, - conn.getContext() - .getAttribute(PassThroughConstants.REQ_DEPARTURE_TIME) - ); - conn.getContext().removeAttribute(PassThroughConstants.REQ_DEPARTURE_TIME); - sourceConn.getContext().setAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_START_TIME, - conn.getContext() - .getAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_START_TIME) - ); - - conn.getContext().removeAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_START_TIME); - sourceConn.getContext().setAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_END_TIME, - conn.getContext() - .getAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_END_TIME) - ); - conn.getContext().removeAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_END_TIME); - sourceConn.getContext().setAttribute(PassThroughConstants.RES_FROM_BACKEND_READ_START_TIME, - conn.getContext() - .getAttribute(PassThroughConstants.RES_FROM_BACKEND_READ_START_TIME) - ); - conn.getContext().removeAttribute(PassThroughConstants.RES_FROM_BACKEND_READ_START_TIME); - + PassThroughTransportUtils.setSourceConnectionContextAttributes(sourceConn, conn); } } catch (Exception ex) { @@ -881,17 +860,14 @@ public void timeout(NHttpClientConnection conn) { ProtocolState state = TargetContext.getState(conn); MessageContext requestMsgCtx = TargetContext.get(conn).getRequestMsgCtx(); Map logDetails = getLoggingInfo(conn, state, requestMsgCtx); + Object clientWorker = conn.getContext().getAttribute( + PassThroughConstants.CLIENT_WORKER_REFERENCE); + Object messageDiscardWorker = conn.getContext().getAttribute( + PassThroughConstants.MESSAGE_DISCARD_WORKER_REFERENCE); if (log.isDebugEnabled()) { log.debug(getErrorMessage("Connection timeout", conn) + " "+ getConnectionLoggingInfo(conn)); } - if (!PassThroughConstants.THREAD_STATUS_RUNNING.equals(conn.getContext().getAttribute - (PassThroughConstants.CLIENT_WORKER_THREAD_STATUS))) { - log.warn("Target Handler Socket Timeout occurred while the worker pool exhausted, " + - "INTERNAL_STATE = " + state + ", CORRELATION_ID = " - + conn.getContext().getAttribute(CorrelationConstants.CORRELATION_ID)); - } - if (state != null && (state == ProtocolState.REQUEST_READY || state == ProtocolState.RESPONSE_DONE)) { if (log.isDebugEnabled()) { @@ -911,7 +887,8 @@ public void timeout(NHttpClientConnection conn) { + ", TRIGGER_TYPE = " + logDetails.get("trigger_type") + ", TRIGGER_NAME = " + logDetails .get("trigger_name") + ", REMOTE_ADDRESS = " + getBackEndConnectionInfo(conn) + ", " + "CONNECTION = " + conn + ", SOCKET_TIMEOUT = " + conn.getSocketTimeout() + ", CORRELATION_ID" - + " = " + conn.getContext().getAttribute(CorrelationConstants.CORRELATION_ID)); + + " = " + conn.getContext().getAttribute(CorrelationConstants.CORRELATION_ID) + + workerPoolExhaustedErrorMessage(clientWorker, messageDiscardWorker)); } if (state == ProtocolState.RESPONSE_BODY || state == ProtocolState.REQUEST_HEAD) { @@ -926,7 +903,8 @@ public void timeout(NHttpClientConnection conn) { + ", TRIGGER_TYPE = " + logDetails.get("trigger_type") + ", TRIGGER_NAME = " + logDetails .get("trigger_name") + ", REMOTE_ADDRESS = " + getBackEndConnectionInfo(conn) + ", " + "CONNECTION = " + conn + ", SOCKET_TIMEOUT = " + conn.getSocketTimeout() + ", " - + "CORRELATION_ID = " + conn.getContext().getAttribute(CorrelationConstants.CORRELATION_ID)); + + "CORRELATION_ID = " + conn.getContext().getAttribute(CorrelationConstants.CORRELATION_ID) + + workerPoolExhaustedErrorMessage(clientWorker, messageDiscardWorker)); } if (state.compareTo(ProtocolState.REQUEST_DONE) <= 0) { @@ -939,7 +917,8 @@ public void timeout(NHttpClientConnection conn) { + ", TRIGGER_TYPE = " + logDetails.get("trigger_type") + ", TRIGGER_NAME = " + logDetails .get("trigger_name") + ", REMOTE_ADDRESS = " + getBackEndConnectionInfo(conn) + ", " + "CONNECTION = " + conn + ", SOCKET_TIMEOUT = " + conn.getSocketTimeout() + ", CORRELATION_ID" - + " = " + conn.getContext().getAttribute(CorrelationConstants.CORRELATION_ID)); + + " = " + conn.getContext().getAttribute(CorrelationConstants.CORRELATION_ID) + + workerPoolExhaustedErrorMessage(clientWorker, messageDiscardWorker)); if (PassThroughCorrelationConfigDataHolder.isEnable()) { logHttpRequestErrorInCorrelationLog(conn, "Timeout in " + state); @@ -960,6 +939,35 @@ public void timeout(NHttpClientConnection conn) { targetConfiguration.getConnections().closeConnection(conn, true); } + private String workerPoolExhaustedErrorMessage(Object clientWorker + , Object messageDiscardWorker) { + String workerPoolExhaustedMessage = ""; + if (messageDiscardWorker != null + && WorkerState.CREATED == ((MessageDiscardWorker) messageDiscardWorker).getWorkerState()) { + workerPoolExhaustedMessage = ", Could not get a secondary worker thread to discard the request content. " + + "The secondary worker pool is exhausted."; + return workerPoolExhaustedMessage; + } else if (messageDiscardWorker != null + && WorkerState.RUNNING == ((MessageDiscardWorker) messageDiscardWorker).getWorkerState()) { + workerPoolExhaustedMessage = ", The secondary worker thread which was discarding the request content" + + " has been released."; + return workerPoolExhaustedMessage; + } else if (messageDiscardWorker != null + && WorkerState.FINISHED == ((MessageDiscardWorker) messageDiscardWorker).getWorkerState()) { + if (clientWorker != null + && WorkerState.CREATED == ((ClientWorker)clientWorker).getWorkerState()) { + workerPoolExhaustedMessage = ", Could not get a PassThroughMessageProcessor thread to process the " + + "response message. The primary worker pool is exhausted."; + return workerPoolExhaustedMessage; + } + } else if (clientWorker != null && WorkerState.CREATED == ((ClientWorker)clientWorker).getWorkerState()) { + workerPoolExhaustedMessage = ", Could not get a PassThroughMessageProcessor thread to process the " + + "response message. The primary worker pool is exhausted."; + return workerPoolExhaustedMessage; + } + return workerPoolExhaustedMessage; + } + private boolean isResponseHaveBodyExpected( final String method, final HttpResponse response) { diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/WorkerState.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/WorkerState.java new file mode 100644 index 0000000000..355c8bb7f1 --- /dev/null +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/WorkerState.java @@ -0,0 +1,31 @@ +/** + * Copyright (c) 2023, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.synapse.transport.passthru; + +/** + * State of workers in pass-through transport. + */ +public enum WorkerState { + /** Worker is marked for execution waiting for thread */ + CREATED, + + /** Worker is executing */ + RUNNING, + + /** Worker completed the job and is marked for termination */ + FINISHED; +} diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfigPNames.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfigPNames.java index 44fdc78541..931e1d32e4 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfigPNames.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfigPNames.java @@ -112,6 +112,12 @@ public interface PassThroughConfigPNames { */ public String CONSUME_AND_DISCARD = "consume_and_discard"; + /** + * Define property to mark If an error has happened in the request processing, + * should consume the data in pipe completely and discard using secondary worker pool. + */ + public String CONSUME_AND_DISCARD_BY_SECONDARY_POOL = "consume_and_discard_by_secondary_pool"; + /** * Defines whether we should close target connection on endpoint timeout. */ @@ -161,6 +167,12 @@ public interface PassThroughConfigPNames { */ public String EXPECTED_MAX_QUEUEING_TIME = "expected_max_queueing_time"; + /** + * Defines max waiting time for a request to be queued for a worker thread + */ + public String EXPECTED_MAX_QUEUEING_TIME_FOR_MESSAGE_DISCARD_WORKER + = "expected_max_queueing_time_for_message_discard_worker"; + /** * Defines whether viewing services are enabled or not */ diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfiguration.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfiguration.java index ec22cf0e4e..d1268e6fdd 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfiguration.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/config/PassThroughConfiguration.java @@ -57,6 +57,8 @@ public class PassThroughConfiguration { private Boolean isConsumeAndDiscard = true; + private Boolean isConsumeAndDiscardBySecondaryWorkerPool = true; + //additional rest dispatch handlers private static final String REST_DISPATCHER_SERVICE="rest.dispatcher.service"; // URI configurations that determine if it requires custom rest dispatcher @@ -156,6 +158,13 @@ public boolean isConsumeAndDiscard() { return isConsumeAndDiscard; } + public boolean isConsumeAndDiscardBySecondaryWorkerPool() { + isConsumeAndDiscardBySecondaryWorkerPool = + ConfigurationBuilderUtil.getBooleanProperty( + PassThroughConfigPNames.CONSUME_AND_DISCARD_BY_SECONDARY_POOL, true, props); + return isConsumeAndDiscardBySecondaryWorkerPool; + } + public int getMaxActiveConnections() { return ConfigurationBuilderUtil.getIntProperty(PassThroughConfigPNames.C_MAX_ACTIVE, DEFAULT_MAX_ACTIVE_CON, props); @@ -244,6 +253,22 @@ public String getExpectedMaxQueueingTime() { EXPECTED_MAX_QUEUEING_TIME_DEFAULT); } + public Long getExpectedMaxQueueingTimeForMessageDiscardWorker() { + String expectedMaxQueuingTime = getStringProperty( + PassThroughConfigPNames.EXPECTED_MAX_QUEUEING_TIME_FOR_MESSAGE_DISCARD_WORKER, + EXPECTED_MAX_QUEUEING_TIME_DEFAULT); + Long convertedExpectedMaxQueuingTime; + try { + convertedExpectedMaxQueuingTime = Long.parseLong(expectedMaxQueuingTime); + } catch (NumberFormatException exception) { + log.warn("Invalid value for the expected max queuing time for message discard worker. Expected max " + + "queuing time should be a long value. " + + "Using the default value " + EXPECTED_MAX_QUEUEING_TIME_DEFAULT); + convertedExpectedMaxQueuingTime = Long.parseLong(EXPECTED_MAX_QUEUEING_TIME_DEFAULT); + } + return convertedExpectedMaxQueuingTime; + } + /** * Loads the properties from a given property file path * diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/connections/SourceConnections.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/connections/SourceConnections.java index f080ad9fa8..e6cf1206f0 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/connections/SourceConnections.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/connections/SourceConnections.java @@ -93,7 +93,8 @@ public void releaseConnection(NHttpServerConnection conn) { removeAttributes(conn); lock.lock(); try { - conn.getContext().removeAttribute(PassThroughConstants.CLIENT_WORKER_THREAD_STATUS); + conn.getContext().removeAttribute(PassThroughConstants.SERVER_WORKER_REFERENCE); + conn.getContext().removeAttribute(PassThroughConstants.MESSAGE_DISCARD_WORKER_REFERENCE); SourceContext.get(conn).reset(); if (busyConnections.remove(conn)) { diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/connections/TargetConnections.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/connections/TargetConnections.java index da3c35ab50..09ef401f36 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/connections/TargetConnections.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/connections/TargetConnections.java @@ -229,7 +229,8 @@ public void releaseConnection(NHttpClientConnection conn) { PassThroughConstants.CONNECTION_POOL); TargetContext.get(conn).reset(false); - conn.getContext().removeAttribute(PassThroughConstants.CLIENT_WORKER_THREAD_STATUS); + conn.getContext().removeAttribute(PassThroughConstants.CLIENT_WORKER_REFERENCE); + conn.getContext().removeAttribute(PassThroughConstants.MESSAGE_DISCARD_WORKER_REFERENCE); //Set the event mask to Read since connection is released to the pool and should be ready to read conn.requestInput(); diff --git a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/PassThroughTransportUtils.java b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/PassThroughTransportUtils.java index 01cd8c8cc7..51aa7c6fcb 100644 --- a/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/PassThroughTransportUtils.java +++ b/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/util/PassThroughTransportUtils.java @@ -26,6 +26,8 @@ import org.apache.axis2.description.AxisService; import org.apache.axis2.description.Parameter; import org.apache.axis2.description.AxisOperation; +import org.apache.http.nio.NHttpClientConnection; +import org.apache.http.nio.NHttpServerConnection; import org.apache.http.protocol.HTTP; import org.apache.http.HttpStatus; import org.apache.commons.logging.Log; @@ -128,6 +130,37 @@ public static void removeUnwantedHeaders(MessageContext msgContext, TargetConfig } + public static void setSourceConnectionContextAttributes(NHttpServerConnection sourceConn, + NHttpClientConnection conn) { + sourceConn.getContext().setAttribute(PassThroughConstants.RES_HEADER_ARRIVAL_TIME, + conn.getContext() + .getAttribute(PassThroughConstants.RES_HEADER_ARRIVAL_TIME) + ); + conn.getContext().removeAttribute(PassThroughConstants.RES_HEADER_ARRIVAL_TIME); + + sourceConn.getContext().setAttribute(PassThroughConstants.REQ_DEPARTURE_TIME, + conn.getContext() + .getAttribute(PassThroughConstants.REQ_DEPARTURE_TIME) + ); + conn.getContext().removeAttribute(PassThroughConstants.REQ_DEPARTURE_TIME); + sourceConn.getContext().setAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_START_TIME, + conn.getContext() + .getAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_START_TIME) + ); + + conn.getContext().removeAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_START_TIME); + sourceConn.getContext().setAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_END_TIME, + conn.getContext() + .getAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_END_TIME) + ); + conn.getContext().removeAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_END_TIME); + sourceConn.getContext().setAttribute(PassThroughConstants.RES_FROM_BACKEND_READ_START_TIME, + conn.getContext() + .getAttribute(PassThroughConstants.RES_FROM_BACKEND_READ_START_TIME) + ); + conn.getContext().removeAttribute(PassThroughConstants.RES_FROM_BACKEND_READ_START_TIME); + } + /** * Remove unwanted headers from the given header map.