This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 17fa7056b88 Fix multicast iterator to be lazy again (#14169)
17fa7056b88 is described below
commit 17fa7056b886f29b890fb75c34e321f9ac5f2d0e
Author: Guillaume Nodet <[email protected]>
AuthorDate: Fri May 17 08:22:13 2024 +0200
Fix multicast iterator to be lazy again (#14169)
If the last element of the iterable is null, the last message will not
contain the final informations such as splitter-size, multicast-complete
---
.../apache/camel/processor/MulticastProcessor.java | 22 ++++++++++++++--------
.../java/org/apache/camel/processor/Splitter.java | 2 +-
...rParallelWithIteratorThrowingExceptionTest.java | 4 ++--
3 files changed, 17 insertions(+), 11 deletions(-)
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 4f1fe2bdddb..78560ad781a 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -70,7 +70,6 @@ import org.apache.camel.support.PatternHelper;
import org.apache.camel.support.PluginHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.CastUtils;
-import org.apache.camel.util.FilterIterator;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.StopWatch;
import org.apache.camel.util.concurrent.AsyncCompletionService;
@@ -313,6 +312,8 @@ public class MulticastProcessor extends
AsyncProcessorSupport
try {
pairs = createProcessorExchangePairs(exchange);
if (pairs instanceof Collection) {
+ pairs = ((Collection<ProcessorExchangePair>) pairs)
+ .stream().filter(Objects::nonNull).toList();
size = ((Collection<ProcessorExchangePair>) pairs).size();
}
} catch (Exception e) {
@@ -409,7 +410,7 @@ public class MulticastProcessor extends
AsyncProcessorSupport
this.original = original;
this.pairs = pairs;
this.callback = callback;
- this.iterator = new FilterIterator<>(pairs.iterator(),
Objects::nonNull);
+ this.iterator = pairs.iterator();
if (timeout > 0) {
timeoutTask = schedule(aggregateExecutorService,
this::timeout, timeout, TimeUnit.MILLISECONDS);
} else {
@@ -535,16 +536,13 @@ public class MulticastProcessor extends
AsyncProcessorSupport
return;
}
- // Check if the iterator is empty
- // This can happen the very first time we check the existence
- // of an item before queuing the run.
- // or some iterators may return true for hasNext() but then
null in next()
- if (!iterator.hasNext()) {
+ // Get next processor exchange pair to sent, skipping null ones
+ ProcessorExchangePair pair = getNextProcessorExchangePair();
+ if (pair == null) {
doDone(result.get(), true);
return;
}
- ProcessorExchangePair pair = iterator.next();
boolean hasNext = iterator.hasNext();
Exchange exchange = pair.getExchange();
@@ -605,6 +603,14 @@ public class MulticastProcessor extends
AsyncProcessorSupport
doDone(null, false);
}
}
+
+ private ProcessorExchangePair getNextProcessorExchangePair() {
+ ProcessorExchangePair tpair = null;
+ while (tpair == null && iterator.hasNext()) {
+ tpair = iterator.next();
+ }
+ return tpair;
+ }
}
/**
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
index 72c557ccbe2..0b513472ee0 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
@@ -295,7 +295,7 @@ public class Splitter extends MulticastProcessor implements
AsyncProcessor, Trac
protected void updateNewExchange(Exchange exchange, int index,
Iterable<ProcessorExchangePair> allPairs, boolean hasNext) {
exchange.setProperty(ExchangePropertyKey.SPLIT_INDEX, index);
if (allPairs instanceof Collection) {
- // non streaming mode, so we know the total size already
+ // non-streaming mode, so we know the total size already
exchange.setProperty(ExchangePropertyKey.SPLIT_SIZE,
((Collection<?>) allPairs).size());
}
if (hasNext) {
diff --git
a/core/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithIteratorThrowingExceptionTest.java
b/core/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithIteratorThrowingExceptionTest.java
index ccd568d9572..517d52bc459 100644
---
a/core/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithIteratorThrowingExceptionTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithIteratorThrowingExceptionTest.java
@@ -49,7 +49,7 @@ public class
SplitterParallelWithIteratorThrowingExceptionTest extends ContextTe
@Test
public void testIteratorThrowExceptionOnSecond() throws Exception {
- getMockEndpoint("mock:line").expectedMessageCount(0);
+ getMockEndpoint("mock:line").expectedMessageCount(1);
getMockEndpoint("mock:end").expectedMessageCount(0);
try {
@@ -65,7 +65,7 @@ public class
SplitterParallelWithIteratorThrowingExceptionTest extends ContextTe
@Test
public void testIteratorThrowExceptionOnThird() throws Exception {
- getMockEndpoint("mock:line").expectedMessageCount(1);
+ getMockEndpoint("mock:line").expectedMessageCount(2);
getMockEndpoint("mock:end").expectedMessageCount(0);
try {