Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CAMEL-21343: camel-seda - Allow to mimic the behavior of camel-vm #15930

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeExtension;
import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.Suspendable;
Expand Down Expand Up @@ -181,19 +182,13 @@ protected void doRun() {
}
if (exchange != null) {
try {
final Exchange target = exchange;
final Exchange original = exchange;
// prepare the exchange before sending to consumer
prepareExchange(target);
final Exchange prepared = prepareExchange(exchange);
// callback to be executed when sending to consumer and processing is done
AsyncCallback callback = doneSync -> {
// log exception if an exception occurred and was not handled
if (target.getException() != null) {
getExceptionHandler().handleException("Error processing exchange", target,
target.getException());
}
};
AsyncCallback callback = doneSync -> onProcessingDone(original, prepared);
// process the exchange
sendToConsumers(target, callback);
sendToConsumers(prepared, callback);
} catch (Exception e) {
getExceptionHandler().handleException("Error processing exchange", exchange, e);
}
Expand All @@ -215,15 +210,38 @@ protected void doRun() {
}
}

/**
* Strategy to invoke when the exchange is done being processed.
* <p/>
* This method is meant to be overridden by subclasses to be able to mimic the behavior of the legacy component
* camel-vm, that is why the parameter {@code prepared} is not used by default.
*
* @param original the exchange before being processed
* @param prepared the exchange processed
*/
protected void onProcessingDone(Exchange original, Exchange prepared) {
// log exception if an exception occurred and was not handled
if (original.getException() != null) {
getExceptionHandler().handleException("Error processing exchange", original,
original.getException());
}
}

/**
* Strategy to prepare exchange for being processed by this consumer
* <p/>
* This method is meant to be overridden by subclasses to be able to mimic the behavior of the legacy component
* camel-vm, that is why the prepared exchange is returned.
*
* @param exchange the exchange
* @param exchange the exchange
* @return the exchange to process by this consumer
*/
protected void prepareExchange(Exchange exchange) {
// this consumer grabbed the exchange so mark its from this route/endpoint
exchange.getExchangeExtension().setFromEndpoint(getEndpoint());
exchange.getExchangeExtension().setFromRouteId(getRouteId());
protected Exchange prepareExchange(Exchange exchange) {
// this consumer grabbed the exchange so mark it's from this route/endpoint
ExchangeExtension exchangeExtension = exchange.getExchangeExtension();
exchangeExtension.setFromEndpoint(getEndpoint());
exchangeExtension.setFromRouteId(getRouteId());
return exchange;
}

/**
Expand Down