This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch uow in repository https://gitbox.apache.org/repos/asf/camel.git
commit 2c4e247c49b19d44846d332d9a0c1832ec93d9c4 Author: Claus Ibsen <[email protected]> AuthorDate: Tue Jul 22 15:24:29 2025 +0200 CAMEL-22259: camel-core - Splitter/Multicast with shareUnitOfWork should use single uow --- .../apache/camel/spi/InternalProcessorFactory.java | 3 - .../camel/impl/engine/CamelInternalProcessor.java | 20 ---- .../processor/DefaultInternalProcessorFactory.java | 8 -- .../apache/camel/processor/MulticastProcessor.java | 12 +-- .../processor/SplitterShareUnitOfWorkTest.java | 118 +++++++++++++++++++++ .../ROOT/pages/camel-4x-upgrade-guide-4_14.adoc | 11 ++ 6 files changed, 133 insertions(+), 39 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java index 0d2424ddd0f..e9486263fee 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java @@ -43,9 +43,6 @@ public interface InternalProcessorFactory { InternalProcessor addUnitOfWorkProcessorAdvice(CamelContext camelContext, Processor processor, Route route); - InternalProcessor addChildUnitOfWorkProcessorAdvice( - CamelContext camelContext, Processor processor, Route route, UnitOfWork parent); - SharedInternalProcessor createSharedCamelInternalProcessor(CamelContext camelContext); Channel createChannel(CamelContext camelContext); diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java index 37f0f8b536a..b4764f615d5 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java @@ -892,26 +892,6 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In } - /** - * Advice when an EIP uses the <tt>shareUnitOfWork</tt> functionality. - */ - public static class ChildUnitOfWorkProcessorAdvice extends UnitOfWorkProcessorAdvice { - - private final UnitOfWork parent; - - public ChildUnitOfWorkProcessorAdvice(Route route, CamelContext camelContext, UnitOfWork parent) { - super(route, camelContext); - this.parent = parent; - } - - @Override - protected UnitOfWork createUnitOfWork(Exchange exchange) { - // let the parent create a child unit of work to be used - return parent.createChildUnitOfWork(exchange); - } - - } - /** * Advice when Message History has been enabled. */ diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java index 74118ab510a..6db4a56608a 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java @@ -32,7 +32,6 @@ import org.apache.camel.spi.InterceptSendToEndpoint; import org.apache.camel.spi.InternalProcessor; import org.apache.camel.spi.InternalProcessorFactory; import org.apache.camel.spi.SharedInternalProcessor; -import org.apache.camel.spi.UnitOfWork; import org.apache.camel.spi.annotations.JdkService; @JdkService(InternalProcessorFactory.FACTORY) @@ -44,13 +43,6 @@ public class DefaultInternalProcessorFactory implements InternalProcessorFactory return internal; } - public InternalProcessor addChildUnitOfWorkProcessorAdvice( - CamelContext camelContext, Processor processor, Route route, UnitOfWork parent) { - CamelInternalProcessor internal = new CamelInternalProcessor(camelContext, processor); - internal.addAdvice(new CamelInternalProcessor.ChildUnitOfWorkProcessorAdvice(route, camelContext, parent)); - return internal; - } - public SharedInternalProcessor createSharedCamelInternalProcessor(CamelContext camelContext) { return new SharedCamelInternalProcessor( camelContext, new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null, camelContext)); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java index dbaf06d26f3..7d18a3a1b5d 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -302,7 +302,6 @@ public class MulticastProcessor extends AsyncProcessorSupport wrapInErrorHandler(route, exchange, processor); } } - ServiceHelper.initService(processorExchangeFactory); } @@ -1125,18 +1124,13 @@ public class MulticastProcessor extends AsyncProcessorSupport */ protected Processor createUnitOfWorkProcessor(Route route, Processor processor, Exchange exchange) { // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW - UnitOfWork parent = exchange.getProperty(ExchangePropertyKey.PARENT_UNIT_OF_WORK, UnitOfWork.class); - if (parent != null) { - return internalProcessorFactory.addChildUnitOfWorkProcessorAdvice(camelContext, processor, route, parent); - } else { - return internalProcessorFactory.addUnitOfWorkProcessorAdvice(camelContext, processor, route); - } + return internalProcessorFactory.addUnitOfWorkProcessorAdvice(camelContext, processor, route); } /** * Prepares the exchange for participating in a shared unit of work * <p/> - * This ensures a child exchange can access its parent {@link UnitOfWork} when it participate in a shared unit of + * This ensures a child exchange can access its parent {@link UnitOfWork} when it participates in a shared unit of * work. * * @param childExchange the child exchange @@ -1144,6 +1138,8 @@ public class MulticastProcessor extends AsyncProcessorSupport */ protected void prepareSharedUnitOfWork(Exchange childExchange, Exchange parentExchange) { childExchange.setProperty(ExchangePropertyKey.PARENT_UNIT_OF_WORK, parentExchange.getUnitOfWork()); + // and then share the unit of work + childExchange.getExchangeExtension().setUnitOfWork(parentExchange.getUnitOfWork()); } @Override diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/SplitterShareUnitOfWorkTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/SplitterShareUnitOfWorkTest.java new file mode 100644 index 00000000000..c19d70727a4 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/SplitterShareUnitOfWorkTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.camel.AggregationStrategy; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.spi.UnitOfWork; +import org.apache.camel.support.SynchronizationAdapter; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class SplitterShareUnitOfWorkTest extends ContextTestSupport { + + private final List<UnitOfWork> uows = new ArrayList<>(); + private final List<UnitOfWork> parentUows = new ArrayList<>(); + private final List<UnitOfWork> doneUows = new ArrayList<>(); + private final List<String> doneBodies = new ArrayList<>(); + + @Test + public void testShareUnitOfWork() throws Exception { + getMockEndpoint("mock:line").expectedBodiesReceived("A", "B", "C"); + getMockEndpoint("mock:result").expectedBodiesReceived("A+B+C"); + + template.sendBody("direct:start", "A,B,C"); + + assertMockEndpointsSatisfied(); + + Assertions.assertEquals(3, uows.size()); + Assertions.assertEquals(3, parentUows.size()); + + // should all be same child uow + Assertions.assertSame(uows.get(0), uows.get(1)); + Assertions.assertSame(uows.get(1), uows.get(2)); + Assertions.assertSame(uows.get(2), uows.get(0)); + + // should all be same parent uow + Assertions.assertSame(parentUows.get(0), parentUows.get(1)); + Assertions.assertSame(parentUows.get(1), parentUows.get(2)); + Assertions.assertSame(parentUows.get(2), parentUows.get(0)); + + // and parent/child should also be same + Assertions.assertSame(uows.get(0), parentUows.get(0)); + Assertions.assertSame(uows.get(1), parentUows.get(1)); + Assertions.assertSame(uows.get(2), parentUows.get(2)); + + // and done uow should be the same + Assertions.assertSame(uows.get(0), doneUows.get(0)); + Assertions.assertSame(uows.get(1), doneUows.get(1)); + Assertions.assertSame(uows.get(2), doneUows.get(2)); + + // uow is done after the entire route so the exchange body is the output from the aggregation strategy + Assertions.assertEquals("A+B+C", doneBodies.get(0)); + Assertions.assertEquals("A+B+C", doneBodies.get(1)); + Assertions.assertEquals("A+B+C", doneBodies.get(2)); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from("direct:start").split(body(), new MyStrategy()).shareUnitOfWork() + .process(e -> { + var u = e.getUnitOfWork(); + var u2 = e.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class); + uows.add(u); + parentUows.add(u2); + u.addSynchronization(new SynchronizationAdapter() { + @Override + public void onDone(Exchange exchange) { + var b = exchange.getMessage().getBody(String.class); + doneBodies.add(b); + var u = exchange.getUnitOfWork(); + doneUows.add(u); + + // should only be invoked after all is complete (3 line and 1 result) + Assertions.assertEquals(3, getMockEndpoint("mock:line").getReceivedCounter()); + Assertions.assertEquals(1, getMockEndpoint("mock:result").getReceivedCounter()); + } + }); + }) + .to("mock:line").end().to("mock:result"); + } + }; + } + + private static class MyStrategy implements AggregationStrategy { + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + if (oldExchange == null) { + return newExchange; + } + String body = oldExchange.getIn().getBody() + "+" + newExchange.getIn().getBody(); + oldExchange.getIn().setBody(body); + return oldExchange; + } + } + +} diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_14.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_14.adoc index 848acb59ebd..d822ae11aba 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_14.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_14.adoc @@ -8,6 +8,17 @@ from both 4.0 to 4.1 and 4.1 to 4.2. === camel-core +==== Splitter and Multicast EIPs + +When using `shareUnitOfWork=true` in Split or Multicast EIPs, then Camel will now use a single shared `UnitOfWork` instance (parent) +for the entire body of work. So if the Splitter is splitting into 1000 sub messages, then each of them will now reuse +the same `UnitOfWork` and any completion tasks that each sub messages, will now be executed later, when the parent `UnitOfWork` +is complete, usually when the original message is completed. + +Previously, each sub-message was independent (despite the documentation refers to this not being the case). However this feature +has been mistakenly for many years, as this feature is rarely in use. However, we had the opportunity to look into this as part +of an issue, and felt it's better to fix this before for this LTS release. + === camel-jbang The `camel export` will not include `camel-observabilities-services` out of the box. To include this, then use `--observe` to enable
