This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch seda
in repository https://gitbox.apache.org/repos/asf/camel.git

commit b2840cc04123a7191a4f6790c035361f03c2f810
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Mar 5 12:10:25 2026 +0100

    CAMEL-23129: Fix seda virtual threads hang. Add also shutdown timeout for 
non vitual thread for just in case.
---
 .../apache/camel/component/seda/SedaConsumer.java  | 23 +++++++++++++++++-----
 .../seda/ThreadPerTaskSedaConsumerTest.java        |  5 +++--
 .../ManagedVirtualThreadExecutorTest.java          |  5 -----
 3 files changed, 21 insertions(+), 12 deletions(-)

diff --git 
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
index 845fd9045513..dcc6b7296074 100644
--- 
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
+++ 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
@@ -48,6 +48,8 @@ public class SedaConsumer extends DefaultConsumer implements 
Runnable, ShutdownA
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SedaConsumer.class);
 
+    private static final long SHUTDOWN_TIMEOUT = 30000L;
+
     private final AtomicInteger taskCount = new AtomicInteger();
     private volatile CountDownLatch latch;
     private volatile boolean shutdownPending;
@@ -112,7 +114,11 @@ public class SedaConsumer extends DefaultConsumer 
implements Runnable, ShutdownA
 
             // wait for all threads to end
             try {
-                latch.await();
+                boolean zero = latch.await(SHUTDOWN_TIMEOUT, 
TimeUnit.MILLISECONDS);
+                if (!zero) {
+                    LOG.warn("Timeout {}ms preparing to shutdown, waiting for 
{} consumer threads to complete.",
+                            SHUTDOWN_TIMEOUT, latch.getCount());
+                }
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
             }
@@ -141,8 +147,10 @@ public class SedaConsumer extends DefaultConsumer 
implements Runnable, ShutdownA
             doRun();
         } finally {
             taskCount.decrementAndGet();
-            latch.countDown();
-            LOG.debug("Ending this polling consumer thread, there are still {} 
consumer threads left.", latch.getCount());
+            if (latch != null) {
+                latch.countDown();
+                LOG.debug("Ending this polling consumer thread, there are 
still {} consumer threads left.", latch.getCount());
+            }
         }
     }
 
@@ -348,7 +356,9 @@ public class SedaConsumer extends DefaultConsumer 
implements Runnable, ShutdownA
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        latch = new CountDownLatch(getEndpoint().getConcurrentConsumers());
+        if (getEndpoint().getConcurrentConsumers() > 0) {
+            latch = new CountDownLatch(getEndpoint().getConcurrentConsumers());
+        }
         shutdownPending = false;
         forceShutdown = false;
 
@@ -421,7 +431,10 @@ public class SedaConsumer extends DefaultConsumer 
implements Runnable, ShutdownA
 
         // submit needed number of tasks
         int tasks = poolSize - taskCount.get();
-        LOG.debug("Creating {} consumer tasks with poll timeout {} ms.", 
tasks, pollTimeout);
+        if (tasks <= 0) {
+            tasks = 1;
+        }
+        LOG.debug("Creating {} queue pooler with poll timeout {} ms.", tasks, 
pollTimeout);
         for (int i = 0; i < tasks; i++) {
             executor.execute(this);
         }
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumerTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumerTest.java
index b0f6e8c3890d..b3cf9e5af948 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumerTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumerTest.java
@@ -19,13 +19,11 @@ package org.apache.camel.component.seda;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /**
  * Test for the virtualThreadPerTask mode of SEDA consumer
  */
-@Disabled
 public class ThreadPerTaskSedaConsumerTest extends ContextTestSupport {
 
     @Test
@@ -71,12 +69,15 @@ public class ThreadPerTaskSedaConsumerTest extends 
ContextTestSupport {
             @Override
             public void configure() {
                 from("seda:test?virtualThreadPerTask=true")
+                        .to("log:result")
                         .to("mock:result");
 
                 
from("seda:limited?virtualThreadPerTask=true&concurrentConsumers=2")
+                        .to("log:limited")
                         .to("mock:limited");
 
                 from("seda:throughput?virtualThreadPerTask=true")
+                        .to("log:throughput")
                         .to("mock:throughput");
             }
         };
diff --git 
a/core/camel-management/src/test/java/org/apache/camel/management/ManagedVirtualThreadExecutorTest.java
 
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedVirtualThreadExecutorTest.java
index e2004883f302..e6e4d500689d 100644
--- 
a/core/camel-management/src/test/java/org/apache/camel/management/ManagedVirtualThreadExecutorTest.java
+++ 
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedVirtualThreadExecutorTest.java
@@ -25,18 +25,13 @@ import javax.management.ObjectName;
 import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.spi.LifecycleStrategy;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.EnabledForJreRange;
-import org.junit.jupiter.api.condition.JRE;
 
 import static 
org.apache.camel.management.DefaultManagementObjectNameStrategy.TYPE_THREAD_POOL;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-@Disabled
-@EnabledForJreRange(min = JRE.JAVA_21)
 public class ManagedVirtualThreadExecutorTest extends ManagementTestSupport {
 
     private ExecutorService vte;

Reply via email to