This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch seda in repository https://gitbox.apache.org/repos/asf/camel.git
commit b2840cc04123a7191a4f6790c035361f03c2f810 Author: Claus Ibsen <[email protected]> AuthorDate: Thu Mar 5 12:10:25 2026 +0100 CAMEL-23129: Fix seda virtual threads hang. Add also shutdown timeout for non vitual thread for just in case. --- .../apache/camel/component/seda/SedaConsumer.java | 23 +++++++++++++++++----- .../seda/ThreadPerTaskSedaConsumerTest.java | 5 +++-- .../ManagedVirtualThreadExecutorTest.java | 5 ----- 3 files changed, 21 insertions(+), 12 deletions(-) diff --git a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java index 845fd9045513..dcc6b7296074 100644 --- a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java +++ b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java @@ -48,6 +48,8 @@ public class SedaConsumer extends DefaultConsumer implements Runnable, ShutdownA private static final Logger LOG = LoggerFactory.getLogger(SedaConsumer.class); + private static final long SHUTDOWN_TIMEOUT = 30000L; + private final AtomicInteger taskCount = new AtomicInteger(); private volatile CountDownLatch latch; private volatile boolean shutdownPending; @@ -112,7 +114,11 @@ public class SedaConsumer extends DefaultConsumer implements Runnable, ShutdownA // wait for all threads to end try { - latch.await(); + boolean zero = latch.await(SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + if (!zero) { + LOG.warn("Timeout {}ms preparing to shutdown, waiting for {} consumer threads to complete.", + SHUTDOWN_TIMEOUT, latch.getCount()); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -141,8 +147,10 @@ public class SedaConsumer extends DefaultConsumer implements Runnable, ShutdownA doRun(); } finally { taskCount.decrementAndGet(); - latch.countDown(); - LOG.debug("Ending this polling consumer thread, there are still {} consumer threads left.", latch.getCount()); + if (latch != null) { + latch.countDown(); + LOG.debug("Ending this polling consumer thread, there are still {} consumer threads left.", latch.getCount()); + } } } @@ -348,7 +356,9 @@ public class SedaConsumer extends DefaultConsumer implements Runnable, ShutdownA @Override protected void doStart() throws Exception { super.doStart(); - latch = new CountDownLatch(getEndpoint().getConcurrentConsumers()); + if (getEndpoint().getConcurrentConsumers() > 0) { + latch = new CountDownLatch(getEndpoint().getConcurrentConsumers()); + } shutdownPending = false; forceShutdown = false; @@ -421,7 +431,10 @@ public class SedaConsumer extends DefaultConsumer implements Runnable, ShutdownA // submit needed number of tasks int tasks = poolSize - taskCount.get(); - LOG.debug("Creating {} consumer tasks with poll timeout {} ms.", tasks, pollTimeout); + if (tasks <= 0) { + tasks = 1; + } + LOG.debug("Creating {} queue pooler with poll timeout {} ms.", tasks, pollTimeout); for (int i = 0; i < tasks; i++) { executor.execute(this); } diff --git a/core/camel-core/src/test/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumerTest.java b/core/camel-core/src/test/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumerTest.java index b0f6e8c3890d..b3cf9e5af948 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumerTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumerTest.java @@ -19,13 +19,11 @@ package org.apache.camel.component.seda; import org.apache.camel.ContextTestSupport; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; /** * Test for the virtualThreadPerTask mode of SEDA consumer */ -@Disabled public class ThreadPerTaskSedaConsumerTest extends ContextTestSupport { @Test @@ -71,12 +69,15 @@ public class ThreadPerTaskSedaConsumerTest extends ContextTestSupport { @Override public void configure() { from("seda:test?virtualThreadPerTask=true") + .to("log:result") .to("mock:result"); from("seda:limited?virtualThreadPerTask=true&concurrentConsumers=2") + .to("log:limited") .to("mock:limited"); from("seda:throughput?virtualThreadPerTask=true") + .to("log:throughput") .to("mock:throughput"); } }; diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedVirtualThreadExecutorTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedVirtualThreadExecutorTest.java index e2004883f302..e6e4d500689d 100644 --- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedVirtualThreadExecutorTest.java +++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedVirtualThreadExecutorTest.java @@ -25,18 +25,13 @@ import javax.management.ObjectName; import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.spi.LifecycleStrategy; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.EnabledForJreRange; -import org.junit.jupiter.api.condition.JRE; import static org.apache.camel.management.DefaultManagementObjectNameStrategy.TYPE_THREAD_POOL; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -@Disabled -@EnabledForJreRange(min = JRE.JAVA_21) public class ManagedVirtualThreadExecutorTest extends ManagementTestSupport { private ExecutorService vte;
