This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch sandbox/camel-3.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit c698faeca8fb118f1a281b84dec269d367d834dd Author: Guillaume Nodet <[email protected]> AuthorDate: Mon Oct 1 17:21:23 2018 +0200 Make InterceptSendToEndpointProcessor asynchronous --- .../apache/camel/impl/InterceptSendToEndpoint.java | 2 +- .../impl/InterceptSendToEndpointProcessor.java | 41 ++++++++++------------ 2 files changed, 19 insertions(+), 24 deletions(-) diff --git a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java index 3b8a0af..044d538 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java @@ -94,7 +94,7 @@ public class InterceptSendToEndpoint implements Endpoint, ShutdownableService { @Override public AsyncProducer createAsyncProducer() throws Exception { - Producer producer = delegate.createProducer(); + AsyncProducer producer = delegate.createAsyncProducer(); return new InterceptSendToEndpointProcessor(this, delegate, producer, skip); } diff --git a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpointProcessor.java b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpointProcessor.java index 5ed03e5..07f245f 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpointProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpointProcessor.java @@ -18,9 +18,11 @@ package org.apache.camel.impl; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; +import org.apache.camel.AsyncProducer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Producer; +import org.apache.camel.util.AsyncProcessorConverterHelper; import org.apache.camel.util.ServiceHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,10 +37,10 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer { private final InterceptSendToEndpoint endpoint; private final Endpoint delegate; - private final Producer producer; + private final AsyncProducer producer; private final boolean skip; - public InterceptSendToEndpointProcessor(InterceptSendToEndpoint endpoint, Endpoint delegate, Producer producer, boolean skip) throws Exception { + public InterceptSendToEndpointProcessor(InterceptSendToEndpoint endpoint, Endpoint delegate, AsyncProducer producer, boolean skip) throws Exception { super(delegate); this.endpoint = endpoint; this.delegate = delegate; @@ -61,18 +63,19 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer { if (endpoint.getDetour() != null) { // detour the exchange using synchronous processing - try { - endpoint.getDetour().process(exchange); - } catch (Exception e) { - exchange.setException(e); - } + AsyncProcessor detour = AsyncProcessorConverterHelper.convert(endpoint.getDetour()); + return detour.process(exchange, s -> callback(exchange, callback, s)); } + return callback(exchange, callback, true); + } + + private boolean callback(Exchange exchange, AsyncCallback callback, boolean doneSync) { // Decide whether to continue or not; similar logic to the Pipeline // check for error if so we should break out if (!continueProcessing(exchange, "skip sending to original intended destination: " + getEndpoint(), log)) { - callback.done(true); - return true; + callback.done(doneSync); + return doneSync; } // determine if we should skip or not @@ -92,24 +95,16 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer { } // route to original destination leveraging the asynchronous routing engine if possible - if (producer instanceof AsyncProcessor) { - AsyncProcessor async = (AsyncProcessor) producer; - return async.process(exchange, callback); - } else { - try { - producer.process(exchange); - } catch (Exception e) { - exchange.setException(e); - } - callback.done(true); - return true; - } + boolean s = producer.process(exchange, ds -> { + callback.done(doneSync && ds); + }); + return doneSync && s; } else { if (log.isDebugEnabled()) { log.debug("Stop() means skip sending exchange to original intended destination: {} for exchange: {}", getEndpoint(), exchange); } - callback.done(true); - return true; + callback.done(doneSync); + return doneSync; } }
