Skip to content

Commit

Permalink
Merge pull request #2086 from chanikag/autoMode
Browse files Browse the repository at this point in the history
Introduce sequence observers
  • Loading branch information
chanikag authored Sep 6, 2023
2 parents f76f311 + 518f56d commit 07b6b34
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.synapse;

public interface SequenceFlowObserver {

/**
* Set the observer name
*
* @param name handler name
*/
void setName(String name);

/**
* This method should implement the logic to run at the start of the flow
*/
void start(MessageContext synCtx, String seqName);

/**
* This method should implement the logic to run at the end of the flow
*/
void complete(MessageContext synCtx, String seqName);

}
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ public static final class Axis2Param {
/** The name of the synapse handlers file */
public static final String SYNAPSE_HANDLER_FILE = "synapse-handlers.xml";

public static final String SEQUENCE_OBSERVERS_FILE = "sequence-observers.xml";

/** the name of the property used for synapse library based class loading */
public static final String SYNAPSE_LIB_LOADER = "synapse.lib.classloader";
/** conf directory name **/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/**
* Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.synapse.config;

import org.apache.axiom.om.OMElement;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.SequenceFlowObserver;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
import org.apache.synapse.commons.util.MiscellaneousUtil;

import javax.xml.namespace.QName;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class SequenceFlowObserversLoader {

private static final QName ROOT_Q = new QName("observers");
private static final QName OBSERVER_Q = new QName("observer");
private static final QName CLASS_Q = new QName("class");
private static final QName NAME_ATT = new QName("name");

private static Log log = LogFactory.getLog(SequenceFlowObserversLoader.class);

public static List<SequenceFlowObserver> loadObservers() {
List<SequenceFlowObserver> observers = new ArrayList<>();
OMElement observersConfig =
MiscellaneousUtil.loadXMLConfig(SynapseConstants.SEQUENCE_OBSERVERS_FILE);
if (observersConfig != null) {

if (!ROOT_Q.equals(observersConfig.getQName())) {
handleException("Invalid sequence observer configuration file");
}

Iterator iterator = observersConfig.getChildrenWithName(OBSERVER_Q);
while (iterator.hasNext()) {
OMElement observerElem = (OMElement) iterator.next();

String name = null;
if (observerElem.getAttribute(NAME_ATT) != null) {
name = observerElem.getAttributeValue(NAME_ATT);
} else {
handleException("Name not defined in one or more sequence observer");
}

if (observerElem.getAttribute(CLASS_Q) != null) {
String className = observerElem.getAttributeValue(CLASS_Q);
if (StringUtils.isNotBlank(className)) {
SequenceFlowObserver observer = createObserver(className);
if (observer != null) {
observers.add(observer);
observer.setName(name);
}
} else {
handleException("Class name is null for sequence observer name : " + name);
}
} else {
handleException("Class name not defined for sequence observer named : " + name);
}

}
}
return observers;
}

private static SequenceFlowObserver createObserver(String classFQName) {
Object obj = null;
try {
obj = Class.forName(classFQName).newInstance();
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
handleException("Error creating Sequence observer for class name : " + classFQName, e);
}

if (obj instanceof SequenceFlowObserver) {
return (SequenceFlowObserver) obj;
} else {
handleException("Error creating Sequence observer. The Sequence observer should be of type " +
"org.apache.synapse.SequenceFlowObserver");
}
return null;
}

private static void handleException(String msg) {
log.error(msg);
throw new SynapseException(msg);
}

private static void handleException(String msg, Exception ex) {
log.error(msg, ex);
throw new SynapseException(msg, ex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.axiom.util.blob.OverflowBlob;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SequenceFlowObserver;
import org.apache.synapse.ServerContextInformation;
import org.apache.synapse.SynapseHandler;
import org.apache.synapse.aspects.flow.statistics.store.MessageDataStore;
Expand Down Expand Up @@ -232,6 +233,20 @@ public interface SynapseEnvironment {
*/
public void registerSynapseHandler(SynapseHandler handler);

/**
* Get all sequence observers
*
* @return list of sequence observers
*/
public List<SequenceFlowObserver> getSequenceObservers();

/**
* Register a sequence observer to the synapse environment
*
* @param observer sequence observer
*/
public void registerSequenceObservers(SequenceFlowObserver observer);

/**
* Get the global timeout interval for callbacks
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.synapse.ContinuationState;
import org.apache.synapse.Mediator;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SequenceFlowObserver;
import org.apache.synapse.ServerContextInformation;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
Expand All @@ -46,6 +47,7 @@
import org.apache.synapse.carbonext.TenantInfoConfigurator;
import org.apache.synapse.commons.json.JsonUtil;
import org.apache.synapse.commons.util.ext.TenantInfoInitiator;
import org.apache.synapse.config.SequenceFlowObserversLoader;
import org.apache.synapse.config.SynapseConfigUtils;
import org.apache.synapse.config.SynapseConfiguration;
import org.apache.synapse.config.SynapseHandlersLoader;
Expand Down Expand Up @@ -100,6 +102,7 @@ public class Axis2SynapseEnvironment implements SynapseEnvironment {
private SynapseTaskManager taskManager;
private RESTRequestHandler restHandler;
private List<SynapseHandler> synapseHandlers;
private List<SequenceFlowObserver> sequenceObservers;
private long globalTimeout = SynapseConstants.DEFAULT_GLOBAL_TIMEOUT;
private SynapseDebugManager synapseDebugManager;

Expand Down Expand Up @@ -207,6 +210,7 @@ public Axis2SynapseEnvironment(SynapseConfiguration synCfg) {
restHandler = new RESTRequestHandler();

synapseHandlers = SynapseHandlersLoader.loadHandlers();
sequenceObservers = SequenceFlowObserversLoader.loadObservers();

this.globalTimeout = SynapseConfigUtils.getGlobalTimeoutInterval();

Expand Down Expand Up @@ -1053,6 +1057,26 @@ public void registerSynapseHandler(SynapseHandler handler) {
synapseHandlers.add(handler);
}

/**
* Get all sequence observers
*
* @return list of sequence observers
*/
@Override
public List<SequenceFlowObserver> getSequenceObservers() {
return sequenceObservers;
}

/**
* Register a sequence observer to the synapse environment
*
* @param observer sequence observer
*/
@Override
public void registerSequenceObservers(SequenceFlowObserver observer) {
sequenceObservers.add(observer);
}

@Override
public long getGlobalTimeout() {
return globalTimeout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.synapse.ManagedLifecycle;
import org.apache.synapse.Mediator;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SequenceFlowObserver;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
import org.apache.synapse.SynapseLog;
Expand All @@ -34,6 +35,7 @@
import org.apache.synapse.config.SynapsePropertiesLoader;
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.mediators.base.SequenceMediator;
import org.apache.synapse.transport.passthru.PassThroughConstants;
import org.apache.synapse.transport.passthru.util.RelayUtils;
import org.apache.synapse.transport.util.MessageHandlerProvider;
Expand Down Expand Up @@ -80,6 +82,12 @@ public boolean mediate(MessageContext synCtx, int mediatorPosition) {
// to pass it on; else, do nothing -> i.e. let the parents state flow
setEffectiveTraceState(synCtx);
int myEffectiveTraceState = synCtx.getTracingState();
if (this instanceof SequenceMediator & mediatorPosition == 0) {
List<SequenceFlowObserver> observers = synCtx.getEnvironment().getSequenceObservers();
for (SequenceFlowObserver observer : observers) {
observer.start(synCtx, ((SequenceMediator) this).getName());
}
}
try {
SynapseLog synLog = getLog(synCtx);
if (synLog.isTraceOrDebugEnabled()) {
Expand All @@ -104,13 +112,29 @@ public boolean mediate(MessageContext synCtx, int mediatorPosition) {
returnVal = false;
break;
}
if (i == mediators.size() - 1) {
if (this instanceof SequenceMediator) {
List<SequenceFlowObserver> observers = synCtx.getEnvironment().getSequenceObservers();
for (SequenceFlowObserver observer : observers) {
observer.complete(synCtx, ((SequenceMediator) this).getName());
}
}
}
mediator.reportCloseStatistics(synCtx, statisticReportingIndex);
} else {
synCtx.setTracingState(myEffectiveTraceState);
if (!mediator.mediate(synCtx)) {
returnVal = false;
break;
}
if (i == mediators.size() - 1) {
if (this instanceof SequenceMediator) {
List<SequenceFlowObserver> observers = synCtx.getEnvironment().getSequenceObservers();
for (SequenceFlowObserver observer : observers) {
observer.complete(synCtx, ((SequenceMediator) this).getName());
}
}
}
}
}
} catch (SynapseException synEx) {
Expand Down

0 comments on commit 07b6b34

Please sign in to comment.