This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 05d51559bc56 CAMEL-22950: Ensure RecipientList propagates the orignal
exchange when using UseOriginalAggregationStrategy (#21266)
05d51559bc56 is described below
commit 05d51559bc5681aab92e5d23240ae92e2b74dee8
Author: David Riseley <[email protected]>
AuthorDate: Thu Feb 5 13:47:09 2026 +0000
CAMEL-22950: Ensure RecipientList propagates the orignal exchange when
using UseOriginalAggregationStrategy (#21266)
Ensure UseOriginalAggregationStrategy handles missing old exchange
---
.../camel/processor/RecipientListProcessor.java | 20 ++++
.../aggregate/UseOriginalAggregationStrategy.java | 11 ++-
...ntListUseOriginalNotPropagateExceptionTest.java | 85 +++++++++++++++++
...istUseOriginalPropagateExceptionCaughtTest.java | 105 +++++++++++++++++++++
...pientListUseOriginalPropagateExceptionTest.java | 104 ++++++++++++++++++++
5 files changed, 321 insertions(+), 4 deletions(-)
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index a67cee877818..f946becd4634 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import org.apache.camel.AggregationStrategy;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProducer;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
@@ -37,6 +38,8 @@ import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.Route;
+import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy;
+import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
import org.apache.camel.spi.NormalizedEndpointUri;
import org.apache.camel.spi.ProducerCache;
import org.apache.camel.support.AsyncProcessorConverterHelper;
@@ -188,6 +191,23 @@ public class RecipientListProcessor extends
MulticastProcessor {
this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
}
+ @Override
+ public boolean process(Exchange exchange, final AsyncCallback callback) {
+ AggregationStrategy strategy = getAggregationStrategy();
+
+ // set original exchange if not already pre-configured
+ if (strategy instanceof UseOriginalAggregationStrategy original) {
+ // need to create a new private instance, as we can also have
concurrency issue so we cannot store state
+ AggregationStrategy clone = original.newInstance(exchange);
+ if (isShareUnitOfWork()) {
+ clone = new ShareUnitOfWorkAggregationStrategy(clone);
+ }
+ setAggregationStrategyOnExchange(exchange, clone);
+ }
+
+ return super.process(exchange, callback);
+ }
+
@Override
protected Iterable<ProcessorExchangePair>
createProcessorExchangePairs(Exchange exchange)
throws Exception {
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/UseOriginalAggregationStrategy.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/UseOriginalAggregationStrategy.java
index 17f4a95dd6c9..bee87834a9f8 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/UseOriginalAggregationStrategy.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/UseOriginalAggregationStrategy.java
@@ -61,25 +61,28 @@ public class UseOriginalAggregationStrategy implements
AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ Exchange target = oldExchange;
if (propagateException) {
Exception exception = checkException(oldExchange, newExchange);
if (exception != null) {
+ target = oldExchange != null ? oldExchange : newExchange;
if (original != null) {
original.setException(exception);
} else {
- oldExchange.setException(exception);
+ target.setException(exception);
}
}
exception = checkCaughtException(oldExchange, newExchange);
if (exception != null) {
+ target = oldExchange != null ? oldExchange : newExchange;
if (original != null) {
original.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
- } else if (oldExchange != null) {
- oldExchange.setProperty(Exchange.EXCEPTION_CAUGHT,
exception);
+ } else {
+ target.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
}
}
}
- return original != null ? original : oldExchange;
+ return original != null ? original : target;
}
protected Exception checkException(Exchange oldExchange, Exchange
newExchange) {
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListUseOriginalNotPropagateExceptionTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListUseOriginalNotPropagateExceptionTest.java
new file mode 100644
index 000000000000..18076fd0c6b8
--- /dev/null
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListUseOriginalNotPropagateExceptionTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class RecipientListUseOriginalNotPropagateExceptionTest extends
ContextTestSupport {
+
+ @ParameterizedTest
+ @ValueSource(strings = {
+ "body",
+ "throw1",
+ "throw2",
+ "throw1;throw2"
+ })
+ public void testWithoutPropagation(String body) throws Exception {
+
+
getMockEndpoint("mock:recipient1").expectedMessageCount(body.contains("throw1")
? 0 : 1);
+
getMockEndpoint("mock:recipient2").expectedMessageCount(body.contains("throw2")
? 0 : 1);
+
+ getMockEndpoint("mock:result").expectedBodiesReceived(body);
+
+ template.sendBody("direct:start", body);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+
+ from("direct:start")
+ .errorHandler(noErrorHandler())
+
.recipientList(constant("direct:recipient1,direct:recipient2"))
+ .aggregationStrategy(new
UseOriginalAggregationStrategy(false))
+ .to("mock:result");
+
+ from("direct:recipient1")
+ .log("recipient1")
+ .choice()
+ .when(bodyAs(String.class).contains("throw1"))
+ .process(
+ exchange -> {
+ throw new RuntimeException("recipient1");
+ })
+ .end()
+ .setBody(constant("recipient1"))
+ .to("mock:recipient1");
+
+ from("direct:recipient2")
+ .log("recipient2")
+ .choice()
+ .when(bodyAs(String.class).contains("throw2"))
+ .process(
+ exchange -> {
+ throw new RuntimeException("recipient2");
+ })
+ .end()
+ .setBody(constant("recipient2"))
+ .to("mock:recipient2");
+ }
+ };
+ }
+
+}
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListUseOriginalPropagateExceptionCaughtTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListUseOriginalPropagateExceptionCaughtTest.java
new file mode 100644
index 000000000000..abf4f0116c25
--- /dev/null
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListUseOriginalPropagateExceptionCaughtTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class RecipientListUseOriginalPropagateExceptionCaughtTest extends
ContextTestSupport {
+
+ @ParameterizedTest
+ @ValueSource(strings = {
+ "caught1",
+ "caught2",
+ })
+ public void testWithPropagation(String body) throws Exception {
+
+ getMockEndpoint("mock:recipient1").expectedMessageCount(1);
+ getMockEndpoint("mock:recipient2").expectedMessageCount(1);
+
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+
+ template.sendBody("direct:start", body);
+
+ assertEquals(1,
getMockEndpoint("mock:result").getReceivedExchanges().size());
+ Exchange exchange =
getMockEndpoint("mock:result").getReceivedExchanges().get(0);
+ Exception exception = exchange.getProperty(Exchange.EXCEPTION_CAUGHT,
Exception.class);
+ assertNotNull(exception);
+ Throwable rootCause = exception;
+ while (rootCause.getCause() != null) {
+ rootCause = rootCause.getCause();
+ }
+ assertInstanceOf(RuntimeException.class, rootCause);
+
+ if (body.contains("caught1")) {
+ assertEquals("recipient1", rootCause.getMessage());
+ }
+ if (body.contains("caught2")) {
+ assertEquals("recipient2", rootCause.getMessage());
+ }
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+
+ from("direct:start")
+ .errorHandler(noErrorHandler())
+
.recipientList(constant("direct:recipient1,direct:recipient2"))
+ .aggregationStrategy(new
UseOriginalAggregationStrategy(true))
+ .to("mock:result");
+
+ from("direct:recipient1")
+ .log("recipient1")
+ .choice()
+ .when(bodyAs(String.class).contains("caught1"))
+ .process(
+ exchange -> {
+
exchange.setProperty(Exchange.EXCEPTION_CAUGHT, new
RuntimeException("recipient1"));
+ })
+ .end()
+ .setBody(constant("recipient1"))
+ .to("mock:recipient1");
+
+ from("direct:recipient2")
+ .log("recipient2")
+ .choice()
+ .when(bodyAs(String.class).contains("caught2"))
+ .process(
+ exchange -> {
+
exchange.setProperty(Exchange.EXCEPTION_CAUGHT, new
RuntimeException("recipient2"));
+ })
+ .end()
+ .setBody(constant("recipient2"))
+ .to("mock:recipient2");
+ }
+ };
+ }
+
+}
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListUseOriginalPropagateExceptionTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListUseOriginalPropagateExceptionTest.java
new file mode 100644
index 000000000000..9b243e628545
--- /dev/null
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListUseOriginalPropagateExceptionTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class RecipientListUseOriginalPropagateExceptionTest extends
ContextTestSupport {
+
+ @ParameterizedTest
+ @ValueSource(strings = {
+ "throw1",
+ "throw2",
+ })
+ public void testWithPropagation(String body) throws Exception {
+
+
getMockEndpoint("mock:recipient1").expectedMessageCount(body.contains("throw1")
? 0 : 1);
+
getMockEndpoint("mock:recipient2").expectedMessageCount(body.contains("throw2")
? 0 : 1);
+
+ getMockEndpoint("mock:result").expectedMessageCount(0);
+
+ CamelExecutionException exception
+ = assertThrows(CamelExecutionException.class, () ->
template.sendBody("direct:start", body));
+
+ Throwable rootCause = exception;
+ while (rootCause.getCause() != null) {
+ rootCause = rootCause.getCause();
+ }
+ assertInstanceOf(RuntimeException.class, rootCause);
+
+ if (body.contains("throw1")) {
+ assertEquals("recipient1", rootCause.getMessage());
+ }
+ if (body.contains("throw2")) {
+ assertEquals("recipient2", rootCause.getMessage());
+ }
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+
+ from("direct:start")
+ .errorHandler(noErrorHandler())
+
.recipientList(constant("direct:recipient1,direct:recipient2"))
+ .aggregationStrategy(new
UseOriginalAggregationStrategy(true))
+ .to("mock:result");
+
+ from("direct:recipient1")
+ .log("recipient1")
+ .choice()
+ .when(bodyAs(String.class).contains("throw1"))
+ .process(
+ exchange -> {
+
exchange.setProperty(Exchange.EXCEPTION_CAUGHT, new
RuntimeException("recipient1Caught"));
+ throw new RuntimeException("recipient1");
+ })
+ .end()
+ .setBody(constant("recipient1"))
+ .to("mock:recipient1");
+
+ from("direct:recipient2")
+ .log("recipient2")
+ .choice()
+ .when(bodyAs(String.class).contains("throw2"))
+ .process(
+ exchange -> {
+ throw new RuntimeException("recipient2");
+ })
+ .end()
+ .setBody(constant("recipient2"))
+ .to("mock:recipient2");
+ }
+ };
+ }
+
+}