This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch fix/seda-virtual-thread-shutdown
in repository https://gitbox.apache.org/repos/asf/camel.git

commit a82a708277028645375fc3dc1a278193d548f79b
Author: Guillaume Nodet <[email protected]>
AuthorDate: Thu Mar 5 12:29:43 2026 +0100

    CAMEL-23129: Fix SEDA virtual thread shutdown hang
    
    Three issues caused the SEDA consumer to hang indefinitely during
    shutdown when using virtualThreadPerTask mode:
    
    1. SedaConsumer.prepareShutdown() called latch.await() with no timeout.
       Now uses the shutdown strategy timeout to prevent indefinite hangs.
    
    2. ThreadPerTaskSedaConsumer inherited the parent's latch count which
       was set to concurrentConsumers (0 for unlimited virtual threads).
       With count=0, no coordinator task was submitted so messages were
       never consumed. Now overrides getLatchCount() to return 1 (the
       coordinator thread) and setupTasks() to always start exactly one
       coordinator regardless of the concurrentConsumers setting.
    
    3. ThreadPerTaskSedaConsumer did not wait for active processing tasks
       during shutdown. Now overrides prepareShutdown() to also await
       termination of the task executor after the coordinator finishes.
    
    Also re-enables ThreadPerTaskSedaConsumerTest (was @Disabled).
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
---
 .../apache/camel/component/seda/SedaConsumer.java  | 31 ++++++++++++++---
 .../component/seda/ThreadPerTaskSedaConsumer.java  | 40 ++++++++++++++++++++--
 .../seda/ThreadPerTaskSedaConsumerTest.java        |  2 --
 3 files changed, 64 insertions(+), 9 deletions(-)

diff --git 
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
index 845fd9045513..cdb96f7fd2b9 100644
--- 
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
+++ 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
@@ -49,7 +49,7 @@ public class SedaConsumer extends DefaultConsumer implements 
Runnable, ShutdownA
     private static final Logger LOG = 
LoggerFactory.getLogger(SedaConsumer.class);
 
     private final AtomicInteger taskCount = new AtomicInteger();
-    private volatile CountDownLatch latch;
+    protected volatile CountDownLatch latch;
     private volatile boolean shutdownPending;
     private volatile boolean forceShutdown;
     private ExecutorService executor;
@@ -108,11 +108,17 @@ public class SedaConsumer extends DefaultConsumer 
implements Runnable, ShutdownA
         forceShutdown = forced;
 
         if (latch != null) {
-            LOG.debug("Preparing to shutdown, waiting for {} consumer threads 
to complete.", latch.getCount());
+            long count = latch.getCount();
+            LOG.debug("Preparing to shutdown, waiting for {} consumer threads 
to complete.", count);
 
-            // wait for all threads to end
+            // wait for all threads to end with a timeout to prevent 
indefinite hangs
             try {
-                latch.await();
+                long timeout = 
getEndpoint().getCamelContext().getShutdownStrategy().getTimeout();
+                TimeUnit timeUnit = 
getEndpoint().getCamelContext().getShutdownStrategy().getTimeUnit();
+                if (!latch.await(timeout, timeUnit)) {
+                    LOG.warn("Timeout waiting for {} consumer threads to 
complete during shutdown (waited {} {}).",
+                            latch.getCount(), timeout, timeUnit);
+                }
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
             }
@@ -348,7 +354,7 @@ public class SedaConsumer extends DefaultConsumer 
implements Runnable, ShutdownA
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        latch = new CountDownLatch(getEndpoint().getConcurrentConsumers());
+        latch = new CountDownLatch(getLatchCount());
         shutdownPending = false;
         forceShutdown = false;
 
@@ -356,6 +362,14 @@ public class SedaConsumer extends DefaultConsumer 
implements Runnable, ShutdownA
         getEndpoint().onStarted(this);
     }
 
+    /**
+     * Returns the number of consumer threads expected for the shutdown latch. 
Subclasses can override this to provide a
+     * different count (e.g., thread-per-task mode only needs 1 for the 
coordinator).
+     */
+    protected int getLatchCount() {
+        return getEndpoint().getConcurrentConsumers();
+    }
+
     @Override
     protected void doSuspend() throws Exception {
         getEndpoint().onStopped(this);
@@ -385,6 +399,13 @@ public class SedaConsumer extends DefaultConsumer 
implements Runnable, ShutdownA
         shutdownExecutor();
     }
 
+    /**
+     * Sets the executor service used for consumer threads. Subclasses can use 
this to provide their own executor.
+     */
+    protected void setExecutor(ExecutorService executor) {
+        this.executor = executor;
+    }
+
     /**
      * Shuts down the executor service.
      */
diff --git 
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumer.java
 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumer.java
index 0f190a414742..f18b951d03d8 100644
--- 
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumer.java
+++ 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumer.java
@@ -60,6 +60,13 @@ public class ThreadPerTaskSedaConsumer extends SedaConsumer {
         this.maxConcurrentTasks = endpoint.getConcurrentConsumers();
     }
 
+    @Override
+    protected int getLatchCount() {
+        // Thread-per-task mode uses exactly 1 coordinator thread,
+        // regardless of the concurrentConsumers setting (which is a 
concurrency limit, not a thread count)
+        return 1;
+    }
+
     @Override
     protected ExecutorService createExecutor(int poolSize) {
         // Create a single-thread executor for the coordinator
@@ -80,13 +87,42 @@ public class ThreadPerTaskSedaConsumer extends SedaConsumer 
{
             LOG.debug("Using concurrency limit of {} for thread-per-task 
consumer", maxConcurrentTasks);
         }
 
-        // Call parent to create the coordinator executor and start it
-        super.setupTasks();
+        // Create the coordinator executor and start exactly one coordinator 
task.
+        // We cannot use super.setupTasks() because it uses 
concurrentConsumers as the
+        // task count, but for thread-per-task mode we always need exactly 1 
coordinator.
+        ExecutorService executor = createExecutor(1);
+        setExecutor(executor);
+        LOG.debug("Creating 1 coordinator task with poll timeout {} ms.", 
pollTimeout);
+        executor.execute(this);
 
         LOG.info("Started thread-per-task SEDA consumer for {} 
(maxConcurrent={})",
                 getEndpoint().getEndpointUri(), maxConcurrentTasks > 0 ? 
maxConcurrentTasks : "unlimited");
     }
 
+    @Override
+    public void prepareShutdown(boolean suspendOnly, boolean forced) {
+        // First wait for the coordinator to finish (parent handles latch)
+        super.prepareShutdown(suspendOnly, forced);
+
+        // Then wait for any active tasks to complete
+        if (!suspendOnly && taskExecutor != null) {
+            long activeCount = activeTasks.sum();
+            if (activeCount > 0) {
+                LOG.debug("Waiting for {} active tasks to complete.", 
activeCount);
+                long timeout = 
getEndpoint().getCamelContext().getShutdownStrategy().getTimeout();
+                TimeUnit timeUnit = 
getEndpoint().getCamelContext().getShutdownStrategy().getTimeUnit();
+                try {
+                    taskExecutor.shutdown();
+                    if (!taskExecutor.awaitTermination(timeout, timeUnit)) {
+                        LOG.warn("Timeout waiting for {} active tasks to 
complete during shutdown.", activeTasks.sum());
+                    }
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+    }
+
     @Override
     protected void shutdownExecutor() {
         super.shutdownExecutor();
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumerTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumerTest.java
index b0f6e8c3890d..f1725fdc6377 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumerTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumerTest.java
@@ -19,13 +19,11 @@ package org.apache.camel.component.seda;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /**
  * Test for the virtualThreadPerTask mode of SEDA consumer
  */
-@Disabled
 public class ThreadPerTaskSedaConsumerTest extends ContextTestSupport {
 
     @Test

Reply via email to