This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 078a61f8eac0 CAMEL-23129: Fix seda virtual threads hang. Add also 
shutdown timeout… (#21718)
078a61f8eac0 is described below

commit 078a61f8eac092f9fabfd745feb7f02a99668dee
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Mar 5 14:00:21 2026 +0100

    CAMEL-23129: Fix seda virtual threads hang. Add also shutdown timeout… 
(#21718)
    
    * CAMEL-23129: Fix seda virtual threads hang. Add also shutdown timeout for 
non vitual thread for just in case.
    
    * Use shutdown strategy timeout and await active tasks during shutdown
    
    - Replace hardcoded 30s timeout in SedaConsumer.prepareShutdown() with
      the configurable shutdown strategy timeout from CamelContext
    - Add prepareShutdown() override in ThreadPerTaskSedaConsumer to await
      completion of in-flight tasks dispatched to the task executor
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    ---------
    
    Co-authored-by: Guillaume Nodet <[email protected]>
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../apache/camel/component/seda/SedaConsumer.java  | 24 ++++++++++++++++------
 .../component/seda/ThreadPerTaskSedaConsumer.java  | 24 ++++++++++++++++++++++
 .../seda/ThreadPerTaskSedaConsumerTest.java        |  5 +++--
 .../ManagedVirtualThreadExecutorTest.java          |  5 -----
 4 files changed, 45 insertions(+), 13 deletions(-)

diff --git 
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
index 845fd9045513..9432ed7860bc 100644
--- 
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
+++ 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
@@ -110,9 +110,14 @@ public class SedaConsumer extends DefaultConsumer 
implements Runnable, ShutdownA
         if (latch != null) {
             LOG.debug("Preparing to shutdown, waiting for {} consumer threads 
to complete.", latch.getCount());
 
-            // wait for all threads to end
+            // wait for all threads to end, using the shutdown strategy timeout
             try {
-                latch.await();
+                long timeout = 
getEndpoint().getCamelContext().getShutdownStrategy().getTimeout();
+                TimeUnit timeUnit = 
getEndpoint().getCamelContext().getShutdownStrategy().getTimeUnit();
+                if (!latch.await(timeout, timeUnit)) {
+                    LOG.warn("Timeout waiting for {} consumer threads to 
complete during shutdown (waited {} {}).",
+                            latch.getCount(), timeout, timeUnit);
+                }
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
             }
@@ -141,8 +146,10 @@ public class SedaConsumer extends DefaultConsumer 
implements Runnable, ShutdownA
             doRun();
         } finally {
             taskCount.decrementAndGet();
-            latch.countDown();
-            LOG.debug("Ending this polling consumer thread, there are still {} 
consumer threads left.", latch.getCount());
+            if (latch != null) {
+                latch.countDown();
+                LOG.debug("Ending this polling consumer thread, there are 
still {} consumer threads left.", latch.getCount());
+            }
         }
     }
 
@@ -348,7 +355,9 @@ public class SedaConsumer extends DefaultConsumer 
implements Runnable, ShutdownA
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        latch = new CountDownLatch(getEndpoint().getConcurrentConsumers());
+        if (getEndpoint().getConcurrentConsumers() > 0) {
+            latch = new CountDownLatch(getEndpoint().getConcurrentConsumers());
+        }
         shutdownPending = false;
         forceShutdown = false;
 
@@ -421,7 +430,10 @@ public class SedaConsumer extends DefaultConsumer 
implements Runnable, ShutdownA
 
         // submit needed number of tasks
         int tasks = poolSize - taskCount.get();
-        LOG.debug("Creating {} consumer tasks with poll timeout {} ms.", 
tasks, pollTimeout);
+        if (tasks <= 0) {
+            tasks = 1;
+        }
+        LOG.debug("Creating {} queue pooler with poll timeout {} ms.", tasks, 
pollTimeout);
         for (int i = 0; i < tasks; i++) {
             executor.execute(this);
         }
diff --git 
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumer.java
 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumer.java
index 0f190a414742..d41c09e17f7d 100644
--- 
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumer.java
+++ 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumer.java
@@ -145,6 +145,30 @@ public class ThreadPerTaskSedaConsumer extends 
SedaConsumer {
         });
     }
 
+    @Override
+    public void prepareShutdown(boolean suspendOnly, boolean forced) {
+        // Let the parent handle the coordinator thread shutdown first
+        super.prepareShutdown(suspendOnly, forced);
+
+        // Then wait for any in-flight tasks dispatched to the task executor
+        if (!suspendOnly && taskExecutor != null) {
+            long activeCount = activeTasks.sum();
+            if (activeCount > 0) {
+                LOG.debug("Waiting for {} active tasks to complete.", 
activeCount);
+                long timeout = 
getEndpoint().getCamelContext().getShutdownStrategy().getTimeout();
+                TimeUnit timeUnit = 
getEndpoint().getCamelContext().getShutdownStrategy().getTimeUnit();
+                try {
+                    taskExecutor.shutdown();
+                    if (!taskExecutor.awaitTermination(timeout, timeUnit)) {
+                        LOG.warn("Timeout waiting for {} active tasks to 
complete during shutdown.", activeTasks.sum());
+                    }
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+    }
+
     /**
      * Returns the current number of active processing tasks.
      */
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumerTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumerTest.java
index b0f6e8c3890d..b3cf9e5af948 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumerTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumerTest.java
@@ -19,13 +19,11 @@ package org.apache.camel.component.seda;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /**
  * Test for the virtualThreadPerTask mode of SEDA consumer
  */
-@Disabled
 public class ThreadPerTaskSedaConsumerTest extends ContextTestSupport {
 
     @Test
@@ -71,12 +69,15 @@ public class ThreadPerTaskSedaConsumerTest extends 
ContextTestSupport {
             @Override
             public void configure() {
                 from("seda:test?virtualThreadPerTask=true")
+                        .to("log:result")
                         .to("mock:result");
 
                 
from("seda:limited?virtualThreadPerTask=true&concurrentConsumers=2")
+                        .to("log:limited")
                         .to("mock:limited");
 
                 from("seda:throughput?virtualThreadPerTask=true")
+                        .to("log:throughput")
                         .to("mock:throughput");
             }
         };
diff --git 
a/core/camel-management/src/test/java/org/apache/camel/management/ManagedVirtualThreadExecutorTest.java
 
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedVirtualThreadExecutorTest.java
index e2004883f302..e6e4d500689d 100644
--- 
a/core/camel-management/src/test/java/org/apache/camel/management/ManagedVirtualThreadExecutorTest.java
+++ 
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedVirtualThreadExecutorTest.java
@@ -25,18 +25,13 @@ import javax.management.ObjectName;
 import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.spi.LifecycleStrategy;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.EnabledForJreRange;
-import org.junit.jupiter.api.condition.JRE;
 
 import static 
org.apache.camel.management.DefaultManagementObjectNameStrategy.TYPE_THREAD_POOL;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-@Disabled
-@EnabledForJreRange(min = JRE.JAVA_21)
 public class ManagedVirtualThreadExecutorTest extends ManagementTestSupport {
 
     private ExecutorService vte;

Reply via email to