This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch uow
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 2c4e247c49b19d44846d332d9a0c1832ec93d9c4
Author: Claus Ibsen <[email protected]>
AuthorDate: Tue Jul 22 15:24:29 2025 +0200

    CAMEL-22259: camel-core - Splitter/Multicast with shareUnitOfWork should 
use single uow
---
 .../apache/camel/spi/InternalProcessorFactory.java |   3 -
 .../camel/impl/engine/CamelInternalProcessor.java  |  20 ----
 .../processor/DefaultInternalProcessorFactory.java |   8 --
 .../apache/camel/processor/MulticastProcessor.java |  12 +--
 .../processor/SplitterShareUnitOfWorkTest.java     | 118 +++++++++++++++++++++
 .../ROOT/pages/camel-4x-upgrade-guide-4_14.adoc    |  11 ++
 6 files changed, 133 insertions(+), 39 deletions(-)

diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
 
b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
index 0d2424ddd0f..e9486263fee 100644
--- 
a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
+++ 
b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
@@ -43,9 +43,6 @@ public interface InternalProcessorFactory {
 
     InternalProcessor addUnitOfWorkProcessorAdvice(CamelContext camelContext, 
Processor processor, Route route);
 
-    InternalProcessor addChildUnitOfWorkProcessorAdvice(
-            CamelContext camelContext, Processor processor, Route route, 
UnitOfWork parent);
-
     SharedInternalProcessor createSharedCamelInternalProcessor(CamelContext 
camelContext);
 
     Channel createChannel(CamelContext camelContext);
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 37f0f8b536a..b4764f615d5 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -892,26 +892,6 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
 
     }
 
-    /**
-     * Advice when an EIP uses the <tt>shareUnitOfWork</tt> functionality.
-     */
-    public static class ChildUnitOfWorkProcessorAdvice extends 
UnitOfWorkProcessorAdvice {
-
-        private final UnitOfWork parent;
-
-        public ChildUnitOfWorkProcessorAdvice(Route route, CamelContext 
camelContext, UnitOfWork parent) {
-            super(route, camelContext);
-            this.parent = parent;
-        }
-
-        @Override
-        protected UnitOfWork createUnitOfWork(Exchange exchange) {
-            // let the parent create a child unit of work to be used
-            return parent.createChildUnitOfWork(exchange);
-        }
-
-    }
-
     /**
      * Advice when Message History has been enabled.
      */
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java
index 74118ab510a..6db4a56608a 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java
@@ -32,7 +32,6 @@ import org.apache.camel.spi.InterceptSendToEndpoint;
 import org.apache.camel.spi.InternalProcessor;
 import org.apache.camel.spi.InternalProcessorFactory;
 import org.apache.camel.spi.SharedInternalProcessor;
-import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.spi.annotations.JdkService;
 
 @JdkService(InternalProcessorFactory.FACTORY)
@@ -44,13 +43,6 @@ public class DefaultInternalProcessorFactory implements 
InternalProcessorFactory
         return internal;
     }
 
-    public InternalProcessor addChildUnitOfWorkProcessorAdvice(
-            CamelContext camelContext, Processor processor, Route route, 
UnitOfWork parent) {
-        CamelInternalProcessor internal = new 
CamelInternalProcessor(camelContext, processor);
-        internal.addAdvice(new 
CamelInternalProcessor.ChildUnitOfWorkProcessorAdvice(route, camelContext, 
parent));
-        return internal;
-    }
-
     public SharedInternalProcessor 
createSharedCamelInternalProcessor(CamelContext camelContext) {
         return new SharedCamelInternalProcessor(
                 camelContext, new 
CamelInternalProcessor.UnitOfWorkProcessorAdvice(null, camelContext));
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index dbaf06d26f3..7d18a3a1b5d 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -302,7 +302,6 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
                 wrapInErrorHandler(route, exchange, processor);
             }
         }
-
         ServiceHelper.initService(processorExchangeFactory);
     }
 
@@ -1125,18 +1124,13 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
      */
     protected Processor createUnitOfWorkProcessor(Route route, Processor 
processor, Exchange exchange) {
         // and wrap it in a unit of work so the UoW is on the top, so the 
entire route will be in the same UoW
-        UnitOfWork parent = 
exchange.getProperty(ExchangePropertyKey.PARENT_UNIT_OF_WORK, UnitOfWork.class);
-        if (parent != null) {
-            return 
internalProcessorFactory.addChildUnitOfWorkProcessorAdvice(camelContext, 
processor, route, parent);
-        } else {
-            return 
internalProcessorFactory.addUnitOfWorkProcessorAdvice(camelContext, processor, 
route);
-        }
+        return 
internalProcessorFactory.addUnitOfWorkProcessorAdvice(camelContext, processor, 
route);
     }
 
     /**
      * Prepares the exchange for participating in a shared unit of work
      * <p/>
-     * This ensures a child exchange can access its parent {@link UnitOfWork} 
when it participate in a shared unit of
+     * This ensures a child exchange can access its parent {@link UnitOfWork} 
when it participates in a shared unit of
      * work.
      *
      * @param childExchange  the child exchange
@@ -1144,6 +1138,8 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
      */
     protected void prepareSharedUnitOfWork(Exchange childExchange, Exchange 
parentExchange) {
         childExchange.setProperty(ExchangePropertyKey.PARENT_UNIT_OF_WORK, 
parentExchange.getUnitOfWork());
+        // and then share the unit of work
+        
childExchange.getExchangeExtension().setUnitOfWork(parentExchange.getUnitOfWork());
     }
 
     @Override
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/SplitterShareUnitOfWorkTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/SplitterShareUnitOfWorkTest.java
new file mode 100644
index 00000000000..c19d70727a4
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/SplitterShareUnitOfWorkTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.UnitOfWork;
+import org.apache.camel.support.SynchronizationAdapter;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class SplitterShareUnitOfWorkTest extends ContextTestSupport {
+
+    private final List<UnitOfWork> uows = new ArrayList<>();
+    private final List<UnitOfWork> parentUows = new ArrayList<>();
+    private final List<UnitOfWork> doneUows = new ArrayList<>();
+    private final List<String> doneBodies = new ArrayList<>();
+
+    @Test
+    public void testShareUnitOfWork() throws Exception {
+        getMockEndpoint("mock:line").expectedBodiesReceived("A", "B", "C");
+        getMockEndpoint("mock:result").expectedBodiesReceived("A+B+C");
+
+        template.sendBody("direct:start", "A,B,C");
+
+        assertMockEndpointsSatisfied();
+
+        Assertions.assertEquals(3, uows.size());
+        Assertions.assertEquals(3, parentUows.size());
+
+        // should all be same child uow
+        Assertions.assertSame(uows.get(0), uows.get(1));
+        Assertions.assertSame(uows.get(1), uows.get(2));
+        Assertions.assertSame(uows.get(2), uows.get(0));
+
+        // should all be same parent uow
+        Assertions.assertSame(parentUows.get(0), parentUows.get(1));
+        Assertions.assertSame(parentUows.get(1), parentUows.get(2));
+        Assertions.assertSame(parentUows.get(2), parentUows.get(0));
+
+        // and parent/child should also be same
+        Assertions.assertSame(uows.get(0), parentUows.get(0));
+        Assertions.assertSame(uows.get(1), parentUows.get(1));
+        Assertions.assertSame(uows.get(2), parentUows.get(2));
+
+        // and done uow should be the same
+        Assertions.assertSame(uows.get(0), doneUows.get(0));
+        Assertions.assertSame(uows.get(1), doneUows.get(1));
+        Assertions.assertSame(uows.get(2), doneUows.get(2));
+
+        // uow is done after the entire route so the exchange body is the 
output from the aggregation strategy
+        Assertions.assertEquals("A+B+C", doneBodies.get(0));
+        Assertions.assertEquals("A+B+C", doneBodies.get(1));
+        Assertions.assertEquals("A+B+C", doneBodies.get(2));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start").split(body(), new 
MyStrategy()).shareUnitOfWork()
+                        .process(e -> {
+                            var u = e.getUnitOfWork();
+                            var u2 = 
e.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class);
+                            uows.add(u);
+                            parentUows.add(u2);
+                            u.addSynchronization(new SynchronizationAdapter() {
+                                @Override
+                                public void onDone(Exchange exchange) {
+                                    var b = 
exchange.getMessage().getBody(String.class);
+                                    doneBodies.add(b);
+                                    var u = exchange.getUnitOfWork();
+                                    doneUows.add(u);
+
+                                    // should only be invoked after all is 
complete (3 line and 1 result)
+                                    Assertions.assertEquals(3, 
getMockEndpoint("mock:line").getReceivedCounter());
+                                    Assertions.assertEquals(1, 
getMockEndpoint("mock:result").getReceivedCounter());
+                                }
+                            });
+                        })
+                        .to("mock:line").end().to("mock:result");
+            }
+        };
+    }
+
+    private static class MyStrategy implements AggregationStrategy {
+
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            String body = oldExchange.getIn().getBody() + "+" + 
newExchange.getIn().getBody();
+            oldExchange.getIn().setBody(body);
+            return oldExchange;
+        }
+    }
+
+}
diff --git 
a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_14.adoc 
b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_14.adoc
index 848acb59ebd..d822ae11aba 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_14.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_14.adoc
@@ -8,6 +8,17 @@ from both 4.0 to 4.1 and 4.1 to 4.2.
 
 === camel-core
 
+==== Splitter and Multicast EIPs
+
+When using `shareUnitOfWork=true` in Split or Multicast EIPs, then Camel will 
now use a single shared `UnitOfWork` instance (parent)
+for the entire body of work. So if the Splitter is splitting into 1000 sub 
messages, then each of them will now reuse
+the same `UnitOfWork` and any completion tasks that each sub messages, will 
now be executed later, when the parent `UnitOfWork`
+is complete, usually when the original message is completed.
+
+Previously, each sub-message was independent (despite the documentation refers 
to this not being the case). However this feature
+has been mistakenly for many years, as this feature is rarely in use. However, 
we had the opportunity to look into this as part
+of an issue, and felt it's better to fix this before for this LTS release.
+
 === camel-jbang
 
 The `camel export` will not include `camel-observabilities-services` out of 
the box. To include this, then use `--observe` to enable

Reply via email to