This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new ed5cc8a6c9f Seda (#14703)
ed5cc8a6c9f is described below
commit ed5cc8a6c9f54a4fdf25a2eed03fa528c8b0f9a1
Author: Claus Ibsen <[email protected]>
AuthorDate: Tue Jul 2 18:01:34 2024 +0200
Seda (#14703)
* CAMEL-20934: camel-seda - Sending to consumers should use callback for
completion work to avoid thread-safety issues
* Run test once
---
.../apache/camel/component/seda/SedaConsumer.java | 41 ++++++++++++----------
.../camel/processor/WireTapAbortPolicyTest.java | 4 +--
2 files changed, 24 insertions(+), 21 deletions(-)
diff --git
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
index 1f04eded390..5fdb9188710 100644
---
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
+++
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -31,8 +32,6 @@ import org.apache.camel.Suspendable;
import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.DefaultConsumer;
-import org.apache.camel.support.EmptyAsyncCallback;
-import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.UnitOfWorkHelper;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
@@ -182,17 +181,19 @@ public class SedaConsumer extends DefaultConsumer
implements Runnable, ShutdownA
}
if (exchange != null) {
try {
+ final Exchange target = exchange;
// prepare the exchange before sending to consumer
- Exchange newExchange = prepareExchange(exchange);
+ prepareExchange(target);
+ // callback to be executed when sending to consumer
and processing is done
+ AsyncCallback callback = doneSync -> {
+ // log exception if an exception occurred and was
not handled
+ if (target.getException() != null) {
+ getExceptionHandler().handleException("Error
processing exchange", target,
+ target.getException());
+ }
+ };
// process the exchange
- sendToConsumers(newExchange);
- // copy result back
- ExchangeHelper.copyResults(exchange, newExchange);
- // log exception if an exception occurred and was not
handled
- if (exchange.getException() != null) {
- getExceptionHandler().handleException("Error
processing exchange", exchange,
- exchange.getException());
- }
+ sendToConsumers(target, callback);
} catch (Exception e) {
getExceptionHandler().handleException("Error
processing exchange", exchange, e);
}
@@ -204,7 +205,6 @@ public class SedaConsumer extends DefaultConsumer
implements Runnable, ShutdownA
} catch (InterruptedException e) {
LOG.debug("Sleep interrupted, are we stopping? {}",
isStopping() || isStopped());
Thread.currentThread().interrupt();
- continue;
} catch (Exception e) {
if (exchange != null) {
getExceptionHandler().handleException("Error processing
exchange", exchange, e);
@@ -218,14 +218,12 @@ public class SedaConsumer extends DefaultConsumer
implements Runnable, ShutdownA
/**
* Strategy to prepare exchange for being processed by this consumer
*
- * @param exchange the exchange
- * @return the exchange to process by this consumer.
+ * @param exchange the exchange
*/
- protected Exchange prepareExchange(Exchange exchange) {
+ protected void prepareExchange(Exchange exchange) {
// this consumer grabbed the exchange so mark its from this
route/endpoint
exchange.getExchangeExtension().setFromEndpoint(getEndpoint());
exchange.getExchangeExtension().setFromRouteId(getRouteId());
- return exchange;
}
/**
@@ -237,9 +235,10 @@ public class SedaConsumer extends DefaultConsumer
implements Runnable, ShutdownA
* If there is only a single consumer then its dispatched directly to it
using same thread.
*
* @param exchange the exchange
+ * @param callback exchange callback to continue routing
* @throws Exception can be thrown if processing of the exchange failed
*/
- protected void sendToConsumers(final Exchange exchange) throws Exception {
+ protected void sendToConsumers(final Exchange exchange, final
AsyncCallback callback) throws Exception {
// validate multiple consumers has been enabled
int size = getEndpoint().getConsumers().size();
if (size > 1 && !getEndpoint().isMultipleConsumersSupported()) {
@@ -263,11 +262,15 @@ public class SedaConsumer extends DefaultConsumer
implements Runnable, ShutdownA
// and use the asynchronous routing engine to support it
mp.process(exchange, doneSync -> {
// done the uow on the completions
- UnitOfWorkHelper.doneSynchronizations(exchange, completions);
+ try {
+ UnitOfWorkHelper.doneSynchronizations(exchange,
completions);
+ } finally {
+ callback.done(doneSync);
+ }
});
} else {
// use the regular processor and use the asynchronous routing
engine to support it
- getAsyncProcessor().process(exchange, EmptyAsyncCallback.get());
+ getAsyncProcessor().process(exchange, callback);
}
}
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
index 81b279f21dd..d96527a0f8d 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
@@ -29,7 +29,7 @@ import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.parallel.Isolated;
@@ -59,7 +59,7 @@ public class WireTapAbortPolicyTest extends
ContextTestSupport {
}
}
- @RepeatedTest(value = 1000)
+ @Test
public void testSend() throws Exception {
// hello must come first, as we have delay on the tapped route
result.expectedMinimumMessageCount(2);