This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 05d51559bc56 CAMEL-22950: Ensure RecipientList propagates the orignal 
exchange when using UseOriginalAggregationStrategy (#21266)
05d51559bc56 is described below

commit 05d51559bc5681aab92e5d23240ae92e2b74dee8
Author: David Riseley <[email protected]>
AuthorDate: Thu Feb 5 13:47:09 2026 +0000

    CAMEL-22950: Ensure RecipientList propagates the orignal exchange when 
using UseOriginalAggregationStrategy (#21266)
    
    Ensure UseOriginalAggregationStrategy handles missing old exchange
---
 .../camel/processor/RecipientListProcessor.java    |  20 ++++
 .../aggregate/UseOriginalAggregationStrategy.java  |  11 ++-
 ...ntListUseOriginalNotPropagateExceptionTest.java |  85 +++++++++++++++++
 ...istUseOriginalPropagateExceptionCaughtTest.java | 105 +++++++++++++++++++++
 ...pientListUseOriginalPropagateExceptionTest.java | 104 ++++++++++++++++++++
 5 files changed, 321 insertions(+), 4 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index a67cee877818..f946becd4634 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.AggregationStrategy;
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProducer;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
@@ -37,6 +38,8 @@ import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.Route;
+import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy;
+import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
 import org.apache.camel.spi.NormalizedEndpointUri;
 import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
@@ -188,6 +191,23 @@ public class RecipientListProcessor extends 
MulticastProcessor {
         this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
     }
 
+    @Override
+    public boolean process(Exchange exchange, final AsyncCallback callback) {
+        AggregationStrategy strategy = getAggregationStrategy();
+
+        // set original exchange if not already pre-configured
+        if (strategy instanceof UseOriginalAggregationStrategy original) {
+            // need to create a new private instance, as we can also have 
concurrency issue so we cannot store state
+            AggregationStrategy clone = original.newInstance(exchange);
+            if (isShareUnitOfWork()) {
+                clone = new ShareUnitOfWorkAggregationStrategy(clone);
+            }
+            setAggregationStrategyOnExchange(exchange, clone);
+        }
+
+        return super.process(exchange, callback);
+    }
+
     @Override
     protected Iterable<ProcessorExchangePair> 
createProcessorExchangePairs(Exchange exchange)
             throws Exception {
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/UseOriginalAggregationStrategy.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/UseOriginalAggregationStrategy.java
index 17f4a95dd6c9..bee87834a9f8 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/UseOriginalAggregationStrategy.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/UseOriginalAggregationStrategy.java
@@ -61,25 +61,28 @@ public class UseOriginalAggregationStrategy implements 
AggregationStrategy {
 
     @Override
     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        Exchange target = oldExchange;
         if (propagateException) {
             Exception exception = checkException(oldExchange, newExchange);
             if (exception != null) {
+                target = oldExchange != null ? oldExchange : newExchange;
                 if (original != null) {
                     original.setException(exception);
                 } else {
-                    oldExchange.setException(exception);
+                    target.setException(exception);
                 }
             }
             exception = checkCaughtException(oldExchange, newExchange);
             if (exception != null) {
+                target = oldExchange != null ? oldExchange : newExchange;
                 if (original != null) {
                     original.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
-                } else if (oldExchange != null) {
-                    oldExchange.setProperty(Exchange.EXCEPTION_CAUGHT, 
exception);
+                } else {
+                    target.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
                 }
             }
         }
-        return original != null ? original : oldExchange;
+        return original != null ? original : target;
     }
 
     protected Exception checkException(Exchange oldExchange, Exchange 
newExchange) {
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListUseOriginalNotPropagateExceptionTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListUseOriginalNotPropagateExceptionTest.java
new file mode 100644
index 000000000000..18076fd0c6b8
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListUseOriginalNotPropagateExceptionTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class RecipientListUseOriginalNotPropagateExceptionTest extends 
ContextTestSupport {
+
+    @ParameterizedTest
+    @ValueSource(strings = {
+            "body",
+            "throw1",
+            "throw2",
+            "throw1;throw2"
+    })
+    public void testWithoutPropagation(String body) throws Exception {
+
+        
getMockEndpoint("mock:recipient1").expectedMessageCount(body.contains("throw1") 
? 0 : 1);
+        
getMockEndpoint("mock:recipient2").expectedMessageCount(body.contains("throw2") 
? 0 : 1);
+
+        getMockEndpoint("mock:result").expectedBodiesReceived(body);
+
+        template.sendBody("direct:start", body);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+
+                from("direct:start")
+                        .errorHandler(noErrorHandler())
+                        
.recipientList(constant("direct:recipient1,direct:recipient2"))
+                        .aggregationStrategy(new 
UseOriginalAggregationStrategy(false))
+                        .to("mock:result");
+
+                from("direct:recipient1")
+                        .log("recipient1")
+                        .choice()
+                        .when(bodyAs(String.class).contains("throw1"))
+                        .process(
+                                exchange -> {
+                                    throw new RuntimeException("recipient1");
+                                })
+                        .end()
+                        .setBody(constant("recipient1"))
+                        .to("mock:recipient1");
+
+                from("direct:recipient2")
+                        .log("recipient2")
+                        .choice()
+                        .when(bodyAs(String.class).contains("throw2"))
+                        .process(
+                                exchange -> {
+                                    throw new RuntimeException("recipient2");
+                                })
+                        .end()
+                        .setBody(constant("recipient2"))
+                        .to("mock:recipient2");
+            }
+        };
+    }
+
+}
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListUseOriginalPropagateExceptionCaughtTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListUseOriginalPropagateExceptionCaughtTest.java
new file mode 100644
index 000000000000..abf4f0116c25
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListUseOriginalPropagateExceptionCaughtTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class RecipientListUseOriginalPropagateExceptionCaughtTest extends 
ContextTestSupport {
+
+    @ParameterizedTest
+    @ValueSource(strings = {
+            "caught1",
+            "caught2",
+    })
+    public void testWithPropagation(String body) throws Exception {
+
+        getMockEndpoint("mock:recipient1").expectedMessageCount(1);
+        getMockEndpoint("mock:recipient2").expectedMessageCount(1);
+
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        template.sendBody("direct:start", body);
+
+        assertEquals(1, 
getMockEndpoint("mock:result").getReceivedExchanges().size());
+        Exchange exchange = 
getMockEndpoint("mock:result").getReceivedExchanges().get(0);
+        Exception exception = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, 
Exception.class);
+        assertNotNull(exception);
+        Throwable rootCause = exception;
+        while (rootCause.getCause() != null) {
+            rootCause = rootCause.getCause();
+        }
+        assertInstanceOf(RuntimeException.class, rootCause);
+
+        if (body.contains("caught1")) {
+            assertEquals("recipient1", rootCause.getMessage());
+        }
+        if (body.contains("caught2")) {
+            assertEquals("recipient2", rootCause.getMessage());
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+
+                from("direct:start")
+                        .errorHandler(noErrorHandler())
+                        
.recipientList(constant("direct:recipient1,direct:recipient2"))
+                        .aggregationStrategy(new 
UseOriginalAggregationStrategy(true))
+                        .to("mock:result");
+
+                from("direct:recipient1")
+                        .log("recipient1")
+                        .choice()
+                        .when(bodyAs(String.class).contains("caught1"))
+                        .process(
+                                exchange -> {
+                                    
exchange.setProperty(Exchange.EXCEPTION_CAUGHT, new 
RuntimeException("recipient1"));
+                                })
+                        .end()
+                        .setBody(constant("recipient1"))
+                        .to("mock:recipient1");
+
+                from("direct:recipient2")
+                        .log("recipient2")
+                        .choice()
+                        .when(bodyAs(String.class).contains("caught2"))
+                        .process(
+                                exchange -> {
+                                    
exchange.setProperty(Exchange.EXCEPTION_CAUGHT, new 
RuntimeException("recipient2"));
+                                })
+                        .end()
+                        .setBody(constant("recipient2"))
+                        .to("mock:recipient2");
+            }
+        };
+    }
+
+}
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListUseOriginalPropagateExceptionTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListUseOriginalPropagateExceptionTest.java
new file mode 100644
index 000000000000..9b243e628545
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListUseOriginalPropagateExceptionTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class RecipientListUseOriginalPropagateExceptionTest extends 
ContextTestSupport {
+
+    @ParameterizedTest
+    @ValueSource(strings = {
+            "throw1",
+            "throw2",
+    })
+    public void testWithPropagation(String body) throws Exception {
+
+        
getMockEndpoint("mock:recipient1").expectedMessageCount(body.contains("throw1") 
? 0 : 1);
+        
getMockEndpoint("mock:recipient2").expectedMessageCount(body.contains("throw2") 
? 0 : 1);
+
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+
+        CamelExecutionException exception
+                = assertThrows(CamelExecutionException.class, () -> 
template.sendBody("direct:start", body));
+
+        Throwable rootCause = exception;
+        while (rootCause.getCause() != null) {
+            rootCause = rootCause.getCause();
+        }
+        assertInstanceOf(RuntimeException.class, rootCause);
+
+        if (body.contains("throw1")) {
+            assertEquals("recipient1", rootCause.getMessage());
+        }
+        if (body.contains("throw2")) {
+            assertEquals("recipient2", rootCause.getMessage());
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+
+                from("direct:start")
+                        .errorHandler(noErrorHandler())
+                        
.recipientList(constant("direct:recipient1,direct:recipient2"))
+                        .aggregationStrategy(new 
UseOriginalAggregationStrategy(true))
+                        .to("mock:result");
+
+                from("direct:recipient1")
+                        .log("recipient1")
+                        .choice()
+                        .when(bodyAs(String.class).contains("throw1"))
+                        .process(
+                                exchange -> {
+                                    
exchange.setProperty(Exchange.EXCEPTION_CAUGHT, new 
RuntimeException("recipient1Caught"));
+                                    throw new RuntimeException("recipient1");
+                                })
+                        .end()
+                        .setBody(constant("recipient1"))
+                        .to("mock:recipient1");
+
+                from("direct:recipient2")
+                        .log("recipient2")
+                        .choice()
+                        .when(bodyAs(String.class).contains("throw2"))
+                        .process(
+                                exchange -> {
+                                    throw new RuntimeException("recipient2");
+                                })
+                        .end()
+                        .setBody(constant("recipient2"))
+                        .to("mock:recipient2");
+            }
+        };
+    }
+
+}

Reply via email to