Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix SO timeout log issue when the worker pool is exhausted. #2081

Merged
merged 1 commit into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,20 @@ public class ClientWorker implements Runnable {
/** the axis2 message context of the request */
private MessageContext requestMessageContext;

private Long queuedTime = null;

private PassThroughConfiguration conf = PassThroughConfiguration.getInstance();

private WorkerState state;

public ClientWorker(TargetConfiguration targetConfiguration, MessageContext outMsgCtx, TargetResponse response) {
this(targetConfiguration, outMsgCtx, response, Collections.emptyList());
}

public ClientWorker(TargetConfiguration targetConfiguration, MessageContext outMsgCtx, TargetResponse response,
List<String> allowedResponseProperties) {
this.state = WorkerState.CREATED;
this.queuedTime = System.currentTimeMillis();
this.targetConfiguration = targetConfiguration;
this.response = response;
this.expectEntityBody = response.isExpectResponseBody();
Expand Down Expand Up @@ -240,15 +246,12 @@ public void run() {
return;
}
// Mark the start of the request at the beginning of the worker thread
response.getConnection().getContext().setAttribute(PassThroughConstants.CLIENT_WORKER_THREAD_STATUS,
PassThroughConstants.THREAD_STATUS_RUNNING);
Object queuedTime =
response.getConnection().getContext().getAttribute(PassThroughConstants.CLIENT_WORKER_START_TIME);
setWorkerState(WorkerState.RUNNING);

String expectedMaxQueueingTime = conf.getExpectedMaxQueueingTime();
if (queuedTime != null && expectedMaxQueueingTime != null) {
Long expectedMaxQueueingTimeInMillis = Long.parseLong(expectedMaxQueueingTime);
Long clientWorkerQueuedTime = System.currentTimeMillis() - (Long) queuedTime;
Long clientWorkerQueuedTime = System.currentTimeMillis() - queuedTime;
if (clientWorkerQueuedTime >= expectedMaxQueueingTimeInMillis) {
log.warn("Client worker thread queued time exceeds the expected max queueing time. Expected max " +
"queueing time : " + expectedMaxQueueingTimeInMillis + "ms. Actual queued time : " +
Expand All @@ -262,6 +265,16 @@ public void run() {
((NHttpServerConnection) responseMsgCtx.getProperty(PassThroughConstants.PASS_THROUGH_SOURCE_CONNECTION)).
getContext().setAttribute(PassThroughConstants.CLIENT_WORKER_START_TIME, System.currentTimeMillis());
}

if (response.isForceShutdownConnectionOnComplete() && !conf.isConsumeAndDiscardBySecondaryWorkerPool()
&& conf.isConsumeAndDiscard()) {
// If an error has happened in the request processing, consumes the data in pipe completely and discard it
try {
RelayUtils.discardRequestMessage(requestMessageContext);
} catch (AxisFault af) {
log.error("Fault discarding request message", af);
}
}
try {
if (expectEntityBody) {
String cType = response.getHeader(HTTP.CONTENT_TYPE);
Expand Down Expand Up @@ -346,6 +359,7 @@ public void run() {
log.error("Fault creating response SOAP envelope", af);
} finally {
cleanup();
setWorkerState(WorkerState.FINISHED);
}
}

Expand Down Expand Up @@ -417,6 +431,14 @@ private String inferContentType() {
return PassThroughConstants.DEFAULT_CONTENT_TYPE;
}

private void setWorkerState(WorkerState workerState) {
this.state = workerState;
}

public WorkerState getWorkerState() {
return this.state;
}

/**
* Perform cleanup of ClientWorker
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import org.apache.commons.logging.LogFactory;
import org.apache.http.nio.NHttpClientConnection;
import org.apache.http.nio.NHttpServerConnection;
import org.apache.synapse.commons.CorrelationConstants;
import org.apache.synapse.transport.passthru.config.PassThroughConfiguration;
import org.apache.synapse.transport.passthru.config.TargetConfiguration;
import org.apache.synapse.transport.passthru.util.PassThroughTransportUtils;
import org.apache.synapse.transport.passthru.util.RelayUtils;

public class MessageDiscardWorker implements Runnable {

private Log log = LogFactory.getLog(MessageDiscardWorker.class);
private ClientWorker clientWorker = null;

TargetConfiguration targetConfiguration = null;

Expand All @@ -38,60 +40,67 @@ public class MessageDiscardWorker implements Runnable {

NHttpClientConnection conn = null;

private Long queuedTime = null;

private PassThroughConfiguration conf = PassThroughConfiguration.getInstance();

private WorkerState state;

public MessageDiscardWorker(MessageContext requestMsgContext, TargetResponse response,
TargetConfiguration targetConfiguration, ClientWorker clientWorker, NHttpClientConnection conn) {
TargetConfiguration targetConfiguration, NHttpClientConnection conn) {
this.state = WorkerState.CREATED;
this.response = response;
this.requestMessageContext = requestMsgContext;
this.targetConfiguration = targetConfiguration;
this.clientWorker = clientWorker;
this.conn = conn;
this.queuedTime = System.currentTimeMillis();
}

public void run() {

// Mark the start of the request message discard worker at the beginning of the worker thread
setWorkerState(WorkerState.RUNNING);
Long expectedMaxQueueingTime = conf.getExpectedMaxQueueingTimeForMessageDiscardWorker();
if (queuedTime != null && expectedMaxQueueingTime != null) {
Long messageDiscardWorkerQueuedTime = System.currentTimeMillis() - queuedTime;
if (messageDiscardWorkerQueuedTime >= expectedMaxQueueingTime) {
log.warn("Message discard worker queued time exceeds the expected max queueing time. Expected "
+ "max queueing time : " + expectedMaxQueueingTime + "ms. Actual queued time : "
+ messageDiscardWorkerQueuedTime + "ms"+ ", CORRELATION_ID : "
+ requestMessageContext.getProperty(CorrelationConstants.CORRELATION_ID));
}

}

// If an error has happened in the request processing, consumes the data in pipe completely and discard it
try {
RelayUtils.discardRequestMessage(requestMessageContext);
} catch (AxisFault af) {
log.error("Fault discarding request message", af);
}

// Mark the end of the request message discard worker at the end of the worker thread
setWorkerState(WorkerState.FINISHED);
ClientWorker clientWorker = new ClientWorker(targetConfiguration, requestMessageContext, response);
conn.getContext().setAttribute(PassThroughConstants.CLIENT_WORKER_REFERENCE
, clientWorker);
targetConfiguration.getWorkerPool().execute(clientWorker);

targetConfiguration.getMetrics().incrementMessagesReceived();

NHttpServerConnection sourceConn = (NHttpServerConnection) requestMessageContext.getProperty(
PassThroughConstants.PASS_THROUGH_SOURCE_CONNECTION);
if (sourceConn != null) {
sourceConn.getContext().setAttribute(PassThroughConstants.RES_HEADER_ARRIVAL_TIME,
conn.getContext()
.getAttribute(PassThroughConstants.RES_HEADER_ARRIVAL_TIME)
);
conn.getContext().removeAttribute(PassThroughConstants.RES_HEADER_ARRIVAL_TIME);

sourceConn.getContext().setAttribute(PassThroughConstants.REQ_DEPARTURE_TIME,
conn.getContext()
.getAttribute(PassThroughConstants.REQ_DEPARTURE_TIME)
);
conn.getContext().removeAttribute(PassThroughConstants.REQ_DEPARTURE_TIME);
sourceConn.getContext().setAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_START_TIME,
conn.getContext()
.getAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_START_TIME)
);

conn.getContext().removeAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_START_TIME);
sourceConn.getContext().setAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_END_TIME,
conn.getContext()
.getAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_END_TIME)
);
conn.getContext().removeAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_END_TIME);
sourceConn.getContext().setAttribute(PassThroughConstants.RES_FROM_BACKEND_READ_START_TIME,
conn.getContext()
.getAttribute(PassThroughConstants.RES_FROM_BACKEND_READ_START_TIME)
);
conn.getContext().removeAttribute(PassThroughConstants.RES_FROM_BACKEND_READ_START_TIME);

PassThroughTransportUtils.setSourceConnectionContextAttributes(sourceConn, conn);
}

}

private void setWorkerState(WorkerState workerState) {
this.state = workerState;
}

public WorkerState getWorkerState() {
return this.state;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public class PassThroughConstants {
public static final String CLONE_PASS_THROUGH_PIPE_REQUEST = "clone_pass-through.pipe_connected";

public static final String CONNECTION_LIMIT_EXCEEDS = "CONNECTION_LIMIT_EXCEEDS";

/**
* Name of the .mar file
*/
Expand Down Expand Up @@ -162,14 +162,10 @@ public class PassThroughConstants {

public static final String SERVER_WORKER_INIT_TIME = "SERVER_WORKER_INIT_TIME";

public static final String SERVER_WORKER_THREAD_STATUS = "SERVER_WORKER_THREAD_STATUS";

public static final String CLIENT_WORKER_THREAD_STATUS = "CLIENT_WORKER_THREAD_STATUS";

public static final String SERVER_WORKER_SIDE_QUEUED_TIME = "SERVER_WORKER_SIDE_QUEUED_TIME";

public static final String CLIENT_WORKER_SIDE_QUEUED_TIME = "CLIENT_WORKER_SIDE_QUEUED_TIME";
public static final String SERVER_WORKER_REFERENCE = "SERVER_WORKER_REFERENCE";

public static final String CLIENT_WORKER_REFERENCE = "CLIENT_WORKER_REFERENCE";
public static final String MESSAGE_DISCARD_WORKER_REFERENCE = "MESSAGE_DISCARD_WORKER_REFERENCE";
public static final String THREAD_STATUS_RUNNING = "RUNNING";

public static final String SERVER_WORKER_START_TIME = "SERVER_WORKER_START_TIME";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,13 @@ public class ServerWorker implements Runnable {
private PassThroughConfiguration conf = PassThroughConfiguration.getInstance();

private OutputStream os; //only used for WSDL requests..


private Long queuedTime = null;
private WorkerState state;

public ServerWorker(final SourceRequest request,
final SourceConfiguration sourceConfiguration,final OutputStream os) {
this.state = WorkerState.CREATED;
this.request = request;
this.sourceConfiguration = sourceConfiguration;

Expand All @@ -134,6 +138,7 @@ public ServerWorker(final SourceRequest request,
request.getConnection().getContext().setAttribute(NhttpConstants.SERVER_WORKER_INIT_TIME,
System.currentTimeMillis());
request.getConnection().getContext().setAttribute(PassThroughConstants.REQUEST_MESSAGE_CONTEXT, msgContext);
queuedTime = System.currentTimeMillis();
}

public ServerWorker(final SourceRequest request,
Expand All @@ -148,15 +153,12 @@ public void run() {
try {

// Mark the start of the request at the beginning of the worker thread
request.getConnection().getContext().setAttribute(PassThroughConstants.SERVER_WORKER_THREAD_STATUS,
PassThroughConstants.THREAD_STATUS_RUNNING);
Object queuedTime =
request.getConnection().getContext().getAttribute(PassThroughConstants.SERVER_WORKER_START_TIME);
setWorkerState(WorkerState.RUNNING);

String expectedMaxQueueingTime = conf.getExpectedMaxQueueingTime();
if (queuedTime != null && expectedMaxQueueingTime != null) {
Long expectedMaxQueueingTimeInMillis = Long.parseLong(expectedMaxQueueingTime);
Long serverWorkerQueuedTime = System.currentTimeMillis() - (Long) queuedTime;
Long serverWorkerQueuedTime = System.currentTimeMillis() - queuedTime;
if (serverWorkerQueuedTime >= expectedMaxQueueingTimeInMillis) {
log.warn("Server worker thread queued time exceeds the expected max queueing time. Expected max " +
"queueing time : " + expectedMaxQueueingTimeInMillis + "ms. Actual queued time : " +
Expand Down Expand Up @@ -637,6 +639,14 @@ public int compare(String o1, String o2) {
return msgContext;
}

private void setWorkerState(WorkerState workerState) {
this.state = workerState;
}

public WorkerState getWorkerState() {
return this.state;
}

private void handleException(String msg, Exception e) {
if (e == null) {
log.error(msg);
Expand Down
Loading
Loading