This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit dcff7fee1df493229d1ac114ca73a3957c7174b0
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon May 4 12:38:10 2020 +0200

    CAMEL-14996: Splitter/Multicast EIP can cause a thread to starve from 
endless stackframes when splitting as it does not collapse its stackframes but 
keep scheduling for next split/task.
---
 .../apache/camel/processor/MulticastProcessor.java | 72 +++++++++++++++++++++-
 1 file changed, 69 insertions(+), 3 deletions(-)

diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 63106f2..d0b1e2c 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
 
 import org.apache.camel.AggregationStrategy;
 import org.apache.camel.AsyncCallback;
@@ -285,6 +286,66 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
         }
     }
 
+    private interface MulticastCompletionService {
+
+        Exchange poll();
+
+        Exchange pollUnordered();
+
+        void submit(Consumer<Consumer<Exchange>> runner);
+
+    }
+
+    private class MulticastCompletionServiceParallelTask implements 
MulticastCompletionService {
+        private final AsyncCompletionService<Exchange> completion;
+
+        public MulticastCompletionServiceParallelTask(ReentrantLock lock) {
+            this.completion = new 
AsyncCompletionService<>(MulticastProcessor.this::schedule, !isStreaming(), 
lock);;
+        }
+
+        @Override
+        public Exchange poll() {
+            return completion.poll();
+        }
+
+        @Override
+        public Exchange pollUnordered() {
+            return completion.pollUnordered();
+        }
+
+        @Override
+        public void submit(Consumer<Consumer<Exchange>> runner) {
+            completion.submit(runner);
+        }
+    }
+
+    private class MulticastCompletionServiceTask implements 
MulticastCompletionService {
+
+        private final AtomicReference<Exchange> exchange = new 
AtomicReference<>();
+
+        public MulticastCompletionServiceTask() {
+        }
+
+        @Override
+        public Exchange poll() {
+            return exchange.getAndSet(null);
+        }
+
+        @Override
+        public Exchange pollUnordered() {
+            return exchange.getAndSet(null);
+        }
+
+        @Override
+        public void submit(Consumer<Consumer<Exchange>> runner) {
+            runner.accept(this::setResult);
+        }
+
+        private void setResult(Exchange result) {
+            this.exchange.set(result);
+        }
+    }
+
     protected class MulticastTask implements Runnable {
 
         final Exchange original;
@@ -292,7 +353,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
         final AsyncCallback callback;
         final Iterator<ProcessorExchangePair> iterator;
         final ReentrantLock lock;
-        final AsyncCompletionService<Exchange> completion;
+        MulticastCompletionService completion;
         final AtomicReference<Exchange> result;
         final AtomicInteger nbExchangeSent = new AtomicInteger();
         final AtomicInteger nbAggregated = new AtomicInteger();
@@ -305,7 +366,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
             this.callback = callback;
             this.iterator = pairs.iterator();
             this.lock = new ReentrantLock();
-            this.completion = new 
AsyncCompletionService<>(MulticastProcessor.this::schedule, !isStreaming(), 
lock);
+            this.completion = new MulticastCompletionServiceTask();
             this.result = new AtomicReference<>();
         }
 
@@ -314,6 +375,10 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
             return "MulticastTask";
         }
 
+        private Exchange completionPoll() {
+            return completion.poll();
+        }
+
         @Override
         public void run() {
             try {
@@ -393,7 +458,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
             if (lock.tryLock()) {
                 try {
                     Exchange exchange;
-                    while (!done.get() && (exchange = completion.poll()) != 
null) {
+                    while (!done.get() && (exchange = completionPoll()) != 
null) {
                         doAggregate(result, exchange, original);
                         if (nbAggregated.incrementAndGet() >= 
nbExchangeSent.get() && allSent.get()) {
                             doDone(result.get(), true);
@@ -420,6 +485,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport implements Navigat
 
         MulticastParallelTask(Exchange original, 
Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
             super(original, pairs, callback);
+            this.completion = new MulticastCompletionServiceParallelTask(lock);
             if (timeout > 0) {
                 schedule(aggregateExecutorService, this::timeout, timeout, 
TimeUnit.MILLISECONDS);
             }

Reply via email to