This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch fix/seda-virtual-thread-shutdown in repository https://gitbox.apache.org/repos/asf/camel.git
commit a82a708277028645375fc3dc1a278193d548f79b Author: Guillaume Nodet <[email protected]> AuthorDate: Thu Mar 5 12:29:43 2026 +0100 CAMEL-23129: Fix SEDA virtual thread shutdown hang Three issues caused the SEDA consumer to hang indefinitely during shutdown when using virtualThreadPerTask mode: 1. SedaConsumer.prepareShutdown() called latch.await() with no timeout. Now uses the shutdown strategy timeout to prevent indefinite hangs. 2. ThreadPerTaskSedaConsumer inherited the parent's latch count which was set to concurrentConsumers (0 for unlimited virtual threads). With count=0, no coordinator task was submitted so messages were never consumed. Now overrides getLatchCount() to return 1 (the coordinator thread) and setupTasks() to always start exactly one coordinator regardless of the concurrentConsumers setting. 3. ThreadPerTaskSedaConsumer did not wait for active processing tasks during shutdown. Now overrides prepareShutdown() to also await termination of the task executor after the coordinator finishes. Also re-enables ThreadPerTaskSedaConsumerTest (was @Disabled). Co-Authored-By: Claude Opus 4.6 <[email protected]> --- .../apache/camel/component/seda/SedaConsumer.java | 31 ++++++++++++++--- .../component/seda/ThreadPerTaskSedaConsumer.java | 40 ++++++++++++++++++++-- .../seda/ThreadPerTaskSedaConsumerTest.java | 2 -- 3 files changed, 64 insertions(+), 9 deletions(-) diff --git a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java index 845fd9045513..cdb96f7fd2b9 100644 --- a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java +++ b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java @@ -49,7 +49,7 @@ public class SedaConsumer extends DefaultConsumer implements Runnable, ShutdownA private static final Logger LOG = LoggerFactory.getLogger(SedaConsumer.class); private final AtomicInteger taskCount = new AtomicInteger(); - private volatile CountDownLatch latch; + protected volatile CountDownLatch latch; private volatile boolean shutdownPending; private volatile boolean forceShutdown; private ExecutorService executor; @@ -108,11 +108,17 @@ public class SedaConsumer extends DefaultConsumer implements Runnable, ShutdownA forceShutdown = forced; if (latch != null) { - LOG.debug("Preparing to shutdown, waiting for {} consumer threads to complete.", latch.getCount()); + long count = latch.getCount(); + LOG.debug("Preparing to shutdown, waiting for {} consumer threads to complete.", count); - // wait for all threads to end + // wait for all threads to end with a timeout to prevent indefinite hangs try { - latch.await(); + long timeout = getEndpoint().getCamelContext().getShutdownStrategy().getTimeout(); + TimeUnit timeUnit = getEndpoint().getCamelContext().getShutdownStrategy().getTimeUnit(); + if (!latch.await(timeout, timeUnit)) { + LOG.warn("Timeout waiting for {} consumer threads to complete during shutdown (waited {} {}).", + latch.getCount(), timeout, timeUnit); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -348,7 +354,7 @@ public class SedaConsumer extends DefaultConsumer implements Runnable, ShutdownA @Override protected void doStart() throws Exception { super.doStart(); - latch = new CountDownLatch(getEndpoint().getConcurrentConsumers()); + latch = new CountDownLatch(getLatchCount()); shutdownPending = false; forceShutdown = false; @@ -356,6 +362,14 @@ public class SedaConsumer extends DefaultConsumer implements Runnable, ShutdownA getEndpoint().onStarted(this); } + /** + * Returns the number of consumer threads expected for the shutdown latch. Subclasses can override this to provide a + * different count (e.g., thread-per-task mode only needs 1 for the coordinator). + */ + protected int getLatchCount() { + return getEndpoint().getConcurrentConsumers(); + } + @Override protected void doSuspend() throws Exception { getEndpoint().onStopped(this); @@ -385,6 +399,13 @@ public class SedaConsumer extends DefaultConsumer implements Runnable, ShutdownA shutdownExecutor(); } + /** + * Sets the executor service used for consumer threads. Subclasses can use this to provide their own executor. + */ + protected void setExecutor(ExecutorService executor) { + this.executor = executor; + } + /** * Shuts down the executor service. */ diff --git a/components/camel-seda/src/main/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumer.java b/components/camel-seda/src/main/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumer.java index 0f190a414742..f18b951d03d8 100644 --- a/components/camel-seda/src/main/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumer.java +++ b/components/camel-seda/src/main/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumer.java @@ -60,6 +60,13 @@ public class ThreadPerTaskSedaConsumer extends SedaConsumer { this.maxConcurrentTasks = endpoint.getConcurrentConsumers(); } + @Override + protected int getLatchCount() { + // Thread-per-task mode uses exactly 1 coordinator thread, + // regardless of the concurrentConsumers setting (which is a concurrency limit, not a thread count) + return 1; + } + @Override protected ExecutorService createExecutor(int poolSize) { // Create a single-thread executor for the coordinator @@ -80,13 +87,42 @@ public class ThreadPerTaskSedaConsumer extends SedaConsumer { LOG.debug("Using concurrency limit of {} for thread-per-task consumer", maxConcurrentTasks); } - // Call parent to create the coordinator executor and start it - super.setupTasks(); + // Create the coordinator executor and start exactly one coordinator task. + // We cannot use super.setupTasks() because it uses concurrentConsumers as the + // task count, but for thread-per-task mode we always need exactly 1 coordinator. + ExecutorService executor = createExecutor(1); + setExecutor(executor); + LOG.debug("Creating 1 coordinator task with poll timeout {} ms.", pollTimeout); + executor.execute(this); LOG.info("Started thread-per-task SEDA consumer for {} (maxConcurrent={})", getEndpoint().getEndpointUri(), maxConcurrentTasks > 0 ? maxConcurrentTasks : "unlimited"); } + @Override + public void prepareShutdown(boolean suspendOnly, boolean forced) { + // First wait for the coordinator to finish (parent handles latch) + super.prepareShutdown(suspendOnly, forced); + + // Then wait for any active tasks to complete + if (!suspendOnly && taskExecutor != null) { + long activeCount = activeTasks.sum(); + if (activeCount > 0) { + LOG.debug("Waiting for {} active tasks to complete.", activeCount); + long timeout = getEndpoint().getCamelContext().getShutdownStrategy().getTimeout(); + TimeUnit timeUnit = getEndpoint().getCamelContext().getShutdownStrategy().getTimeUnit(); + try { + taskExecutor.shutdown(); + if (!taskExecutor.awaitTermination(timeout, timeUnit)) { + LOG.warn("Timeout waiting for {} active tasks to complete during shutdown.", activeTasks.sum()); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + } + @Override protected void shutdownExecutor() { super.shutdownExecutor(); diff --git a/core/camel-core/src/test/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumerTest.java b/core/camel-core/src/test/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumerTest.java index b0f6e8c3890d..f1725fdc6377 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumerTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumerTest.java @@ -19,13 +19,11 @@ package org.apache.camel.component.seda; import org.apache.camel.ContextTestSupport; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; /** * Test for the virtualThreadPerTask mode of SEDA consumer */ -@Disabled public class ThreadPerTaskSedaConsumerTest extends ContextTestSupport { @Test
