Skip to content

Commit

Permalink
Merge pull request #2081 from malakaganga/change_thread_context
Browse files Browse the repository at this point in the history
Fix SO timeout log issue when the worker pool is exhausted.
  • Loading branch information
malakaganga authored Jun 29, 2023
2 parents b6c8fda + bed5629 commit dd90d47
Show file tree
Hide file tree
Showing 12 changed files with 332 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,20 @@ public class ClientWorker implements Runnable {
/** the axis2 message context of the request */
private MessageContext requestMessageContext;

private Long queuedTime = null;

private PassThroughConfiguration conf = PassThroughConfiguration.getInstance();

private WorkerState state;

public ClientWorker(TargetConfiguration targetConfiguration, MessageContext outMsgCtx, TargetResponse response) {
this(targetConfiguration, outMsgCtx, response, Collections.emptyList());
}

public ClientWorker(TargetConfiguration targetConfiguration, MessageContext outMsgCtx, TargetResponse response,
List<String> allowedResponseProperties) {
this.state = WorkerState.CREATED;
this.queuedTime = System.currentTimeMillis();
this.targetConfiguration = targetConfiguration;
this.response = response;
this.expectEntityBody = response.isExpectResponseBody();
Expand Down Expand Up @@ -240,15 +246,12 @@ public void run() {
return;
}
// Mark the start of the request at the beginning of the worker thread
response.getConnection().getContext().setAttribute(PassThroughConstants.CLIENT_WORKER_THREAD_STATUS,
PassThroughConstants.THREAD_STATUS_RUNNING);
Object queuedTime =
response.getConnection().getContext().getAttribute(PassThroughConstants.CLIENT_WORKER_START_TIME);
setWorkerState(WorkerState.RUNNING);

String expectedMaxQueueingTime = conf.getExpectedMaxQueueingTime();
if (queuedTime != null && expectedMaxQueueingTime != null) {
Long expectedMaxQueueingTimeInMillis = Long.parseLong(expectedMaxQueueingTime);
Long clientWorkerQueuedTime = System.currentTimeMillis() - (Long) queuedTime;
Long clientWorkerQueuedTime = System.currentTimeMillis() - queuedTime;
if (clientWorkerQueuedTime >= expectedMaxQueueingTimeInMillis) {
log.warn("Client worker thread queued time exceeds the expected max queueing time. Expected max " +
"queueing time : " + expectedMaxQueueingTimeInMillis + "ms. Actual queued time : " +
Expand All @@ -262,6 +265,16 @@ public void run() {
((NHttpServerConnection) responseMsgCtx.getProperty(PassThroughConstants.PASS_THROUGH_SOURCE_CONNECTION)).
getContext().setAttribute(PassThroughConstants.CLIENT_WORKER_START_TIME, System.currentTimeMillis());
}

if (response.isForceShutdownConnectionOnComplete() && !conf.isConsumeAndDiscardBySecondaryWorkerPool()
&& conf.isConsumeAndDiscard()) {
// If an error has happened in the request processing, consumes the data in pipe completely and discard it
try {
RelayUtils.discardRequestMessage(requestMessageContext);
} catch (AxisFault af) {
log.error("Fault discarding request message", af);
}
}
try {
if (expectEntityBody) {
String cType = response.getHeader(HTTP.CONTENT_TYPE);
Expand Down Expand Up @@ -346,6 +359,7 @@ public void run() {
log.error("Fault creating response SOAP envelope", af);
} finally {
cleanup();
setWorkerState(WorkerState.FINISHED);
}
}

Expand Down Expand Up @@ -417,6 +431,14 @@ private String inferContentType() {
return PassThroughConstants.DEFAULT_CONTENT_TYPE;
}

private void setWorkerState(WorkerState workerState) {
this.state = workerState;
}

public WorkerState getWorkerState() {
return this.state;
}

/**
* Perform cleanup of ClientWorker
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import org.apache.commons.logging.LogFactory;
import org.apache.http.nio.NHttpClientConnection;
import org.apache.http.nio.NHttpServerConnection;
import org.apache.synapse.commons.CorrelationConstants;
import org.apache.synapse.transport.passthru.config.PassThroughConfiguration;
import org.apache.synapse.transport.passthru.config.TargetConfiguration;
import org.apache.synapse.transport.passthru.util.PassThroughTransportUtils;
import org.apache.synapse.transport.passthru.util.RelayUtils;

public class MessageDiscardWorker implements Runnable {

private Log log = LogFactory.getLog(MessageDiscardWorker.class);
private ClientWorker clientWorker = null;

TargetConfiguration targetConfiguration = null;

Expand All @@ -38,60 +40,67 @@ public class MessageDiscardWorker implements Runnable {

NHttpClientConnection conn = null;

private Long queuedTime = null;

private PassThroughConfiguration conf = PassThroughConfiguration.getInstance();

private WorkerState state;

public MessageDiscardWorker(MessageContext requestMsgContext, TargetResponse response,
TargetConfiguration targetConfiguration, ClientWorker clientWorker, NHttpClientConnection conn) {
TargetConfiguration targetConfiguration, NHttpClientConnection conn) {
this.state = WorkerState.CREATED;
this.response = response;
this.requestMessageContext = requestMsgContext;
this.targetConfiguration = targetConfiguration;
this.clientWorker = clientWorker;
this.conn = conn;
this.queuedTime = System.currentTimeMillis();
}

public void run() {

// Mark the start of the request message discard worker at the beginning of the worker thread
setWorkerState(WorkerState.RUNNING);
Long expectedMaxQueueingTime = conf.getExpectedMaxQueueingTimeForMessageDiscardWorker();
if (queuedTime != null && expectedMaxQueueingTime != null) {
Long messageDiscardWorkerQueuedTime = System.currentTimeMillis() - queuedTime;
if (messageDiscardWorkerQueuedTime >= expectedMaxQueueingTime) {
log.warn("Message discard worker queued time exceeds the expected max queueing time. Expected "
+ "max queueing time : " + expectedMaxQueueingTime + "ms. Actual queued time : "
+ messageDiscardWorkerQueuedTime + "ms"+ ", CORRELATION_ID : "
+ requestMessageContext.getProperty(CorrelationConstants.CORRELATION_ID));
}

}

// If an error has happened in the request processing, consumes the data in pipe completely and discard it
try {
RelayUtils.discardRequestMessage(requestMessageContext);
} catch (AxisFault af) {
log.error("Fault discarding request message", af);
}

// Mark the end of the request message discard worker at the end of the worker thread
setWorkerState(WorkerState.FINISHED);
ClientWorker clientWorker = new ClientWorker(targetConfiguration, requestMessageContext, response);
conn.getContext().setAttribute(PassThroughConstants.CLIENT_WORKER_REFERENCE
, clientWorker);
targetConfiguration.getWorkerPool().execute(clientWorker);

targetConfiguration.getMetrics().incrementMessagesReceived();

NHttpServerConnection sourceConn = (NHttpServerConnection) requestMessageContext.getProperty(
PassThroughConstants.PASS_THROUGH_SOURCE_CONNECTION);
if (sourceConn != null) {
sourceConn.getContext().setAttribute(PassThroughConstants.RES_HEADER_ARRIVAL_TIME,
conn.getContext()
.getAttribute(PassThroughConstants.RES_HEADER_ARRIVAL_TIME)
);
conn.getContext().removeAttribute(PassThroughConstants.RES_HEADER_ARRIVAL_TIME);

sourceConn.getContext().setAttribute(PassThroughConstants.REQ_DEPARTURE_TIME,
conn.getContext()
.getAttribute(PassThroughConstants.REQ_DEPARTURE_TIME)
);
conn.getContext().removeAttribute(PassThroughConstants.REQ_DEPARTURE_TIME);
sourceConn.getContext().setAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_START_TIME,
conn.getContext()
.getAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_START_TIME)
);

conn.getContext().removeAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_START_TIME);
sourceConn.getContext().setAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_END_TIME,
conn.getContext()
.getAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_END_TIME)
);
conn.getContext().removeAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_END_TIME);
sourceConn.getContext().setAttribute(PassThroughConstants.RES_FROM_BACKEND_READ_START_TIME,
conn.getContext()
.getAttribute(PassThroughConstants.RES_FROM_BACKEND_READ_START_TIME)
);
conn.getContext().removeAttribute(PassThroughConstants.RES_FROM_BACKEND_READ_START_TIME);

PassThroughTransportUtils.setSourceConnectionContextAttributes(sourceConn, conn);
}

}

private void setWorkerState(WorkerState workerState) {
this.state = workerState;
}

public WorkerState getWorkerState() {
return this.state;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public class PassThroughConstants {
public static final String CLONE_PASS_THROUGH_PIPE_REQUEST = "clone_pass-through.pipe_connected";

public static final String CONNECTION_LIMIT_EXCEEDS = "CONNECTION_LIMIT_EXCEEDS";

/**
* Name of the .mar file
*/
Expand Down Expand Up @@ -162,14 +162,10 @@ public class PassThroughConstants {

public static final String SERVER_WORKER_INIT_TIME = "SERVER_WORKER_INIT_TIME";

public static final String SERVER_WORKER_THREAD_STATUS = "SERVER_WORKER_THREAD_STATUS";

public static final String CLIENT_WORKER_THREAD_STATUS = "CLIENT_WORKER_THREAD_STATUS";

public static final String SERVER_WORKER_SIDE_QUEUED_TIME = "SERVER_WORKER_SIDE_QUEUED_TIME";

public static final String CLIENT_WORKER_SIDE_QUEUED_TIME = "CLIENT_WORKER_SIDE_QUEUED_TIME";
public static final String SERVER_WORKER_REFERENCE = "SERVER_WORKER_REFERENCE";

public static final String CLIENT_WORKER_REFERENCE = "CLIENT_WORKER_REFERENCE";
public static final String MESSAGE_DISCARD_WORKER_REFERENCE = "MESSAGE_DISCARD_WORKER_REFERENCE";
public static final String THREAD_STATUS_RUNNING = "RUNNING";

public static final String SERVER_WORKER_START_TIME = "SERVER_WORKER_START_TIME";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,13 @@ public class ServerWorker implements Runnable {
private PassThroughConfiguration conf = PassThroughConfiguration.getInstance();

private OutputStream os; //only used for WSDL requests..


private Long queuedTime = null;
private WorkerState state;

public ServerWorker(final SourceRequest request,
final SourceConfiguration sourceConfiguration,final OutputStream os) {
this.state = WorkerState.CREATED;
this.request = request;
this.sourceConfiguration = sourceConfiguration;

Expand All @@ -134,6 +138,7 @@ public ServerWorker(final SourceRequest request,
request.getConnection().getContext().setAttribute(NhttpConstants.SERVER_WORKER_INIT_TIME,
System.currentTimeMillis());
request.getConnection().getContext().setAttribute(PassThroughConstants.REQUEST_MESSAGE_CONTEXT, msgContext);
queuedTime = System.currentTimeMillis();
}

public ServerWorker(final SourceRequest request,
Expand All @@ -148,15 +153,12 @@ public void run() {
try {

// Mark the start of the request at the beginning of the worker thread
request.getConnection().getContext().setAttribute(PassThroughConstants.SERVER_WORKER_THREAD_STATUS,
PassThroughConstants.THREAD_STATUS_RUNNING);
Object queuedTime =
request.getConnection().getContext().getAttribute(PassThroughConstants.SERVER_WORKER_START_TIME);
setWorkerState(WorkerState.RUNNING);

String expectedMaxQueueingTime = conf.getExpectedMaxQueueingTime();
if (queuedTime != null && expectedMaxQueueingTime != null) {
Long expectedMaxQueueingTimeInMillis = Long.parseLong(expectedMaxQueueingTime);
Long serverWorkerQueuedTime = System.currentTimeMillis() - (Long) queuedTime;
Long serverWorkerQueuedTime = System.currentTimeMillis() - queuedTime;
if (serverWorkerQueuedTime >= expectedMaxQueueingTimeInMillis) {
log.warn("Server worker thread queued time exceeds the expected max queueing time. Expected max " +
"queueing time : " + expectedMaxQueueingTimeInMillis + "ms. Actual queued time : " +
Expand Down Expand Up @@ -637,6 +639,14 @@ public int compare(String o1, String o2) {
return msgContext;
}

private void setWorkerState(WorkerState workerState) {
this.state = workerState;
}

public WorkerState getWorkerState() {
return this.state;
}

private void handleException(String msg, Exception e) {
if (e == null) {
log.error(msg);
Expand Down
Loading

0 comments on commit dd90d47

Please sign in to comment.