This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 078a61f8eac0 CAMEL-23129: Fix seda virtual threads hang. Add also
shutdown timeout… (#21718)
078a61f8eac0 is described below
commit 078a61f8eac092f9fabfd745feb7f02a99668dee
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Mar 5 14:00:21 2026 +0100
CAMEL-23129: Fix seda virtual threads hang. Add also shutdown timeout…
(#21718)
* CAMEL-23129: Fix seda virtual threads hang. Add also shutdown timeout for
non vitual thread for just in case.
* Use shutdown strategy timeout and await active tasks during shutdown
- Replace hardcoded 30s timeout in SedaConsumer.prepareShutdown() with
the configurable shutdown strategy timeout from CamelContext
- Add prepareShutdown() override in ThreadPerTaskSedaConsumer to await
completion of in-flight tasks dispatched to the task executor
Co-Authored-By: Claude Opus 4.6 <[email protected]>
---------
Co-authored-by: Guillaume Nodet <[email protected]>
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../apache/camel/component/seda/SedaConsumer.java | 24 ++++++++++++++++------
.../component/seda/ThreadPerTaskSedaConsumer.java | 24 ++++++++++++++++++++++
.../seda/ThreadPerTaskSedaConsumerTest.java | 5 +++--
.../ManagedVirtualThreadExecutorTest.java | 5 -----
4 files changed, 45 insertions(+), 13 deletions(-)
diff --git
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
index 845fd9045513..9432ed7860bc 100644
---
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
+++
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
@@ -110,9 +110,14 @@ public class SedaConsumer extends DefaultConsumer
implements Runnable, ShutdownA
if (latch != null) {
LOG.debug("Preparing to shutdown, waiting for {} consumer threads
to complete.", latch.getCount());
- // wait for all threads to end
+ // wait for all threads to end, using the shutdown strategy timeout
try {
- latch.await();
+ long timeout =
getEndpoint().getCamelContext().getShutdownStrategy().getTimeout();
+ TimeUnit timeUnit =
getEndpoint().getCamelContext().getShutdownStrategy().getTimeUnit();
+ if (!latch.await(timeout, timeUnit)) {
+ LOG.warn("Timeout waiting for {} consumer threads to
complete during shutdown (waited {} {}).",
+ latch.getCount(), timeout, timeUnit);
+ }
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@@ -141,8 +146,10 @@ public class SedaConsumer extends DefaultConsumer
implements Runnable, ShutdownA
doRun();
} finally {
taskCount.decrementAndGet();
- latch.countDown();
- LOG.debug("Ending this polling consumer thread, there are still {}
consumer threads left.", latch.getCount());
+ if (latch != null) {
+ latch.countDown();
+ LOG.debug("Ending this polling consumer thread, there are
still {} consumer threads left.", latch.getCount());
+ }
}
}
@@ -348,7 +355,9 @@ public class SedaConsumer extends DefaultConsumer
implements Runnable, ShutdownA
@Override
protected void doStart() throws Exception {
super.doStart();
- latch = new CountDownLatch(getEndpoint().getConcurrentConsumers());
+ if (getEndpoint().getConcurrentConsumers() > 0) {
+ latch = new CountDownLatch(getEndpoint().getConcurrentConsumers());
+ }
shutdownPending = false;
forceShutdown = false;
@@ -421,7 +430,10 @@ public class SedaConsumer extends DefaultConsumer
implements Runnable, ShutdownA
// submit needed number of tasks
int tasks = poolSize - taskCount.get();
- LOG.debug("Creating {} consumer tasks with poll timeout {} ms.",
tasks, pollTimeout);
+ if (tasks <= 0) {
+ tasks = 1;
+ }
+ LOG.debug("Creating {} queue pooler with poll timeout {} ms.", tasks,
pollTimeout);
for (int i = 0; i < tasks; i++) {
executor.execute(this);
}
diff --git
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumer.java
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumer.java
index 0f190a414742..d41c09e17f7d 100644
---
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumer.java
+++
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumer.java
@@ -145,6 +145,30 @@ public class ThreadPerTaskSedaConsumer extends
SedaConsumer {
});
}
+ @Override
+ public void prepareShutdown(boolean suspendOnly, boolean forced) {
+ // Let the parent handle the coordinator thread shutdown first
+ super.prepareShutdown(suspendOnly, forced);
+
+ // Then wait for any in-flight tasks dispatched to the task executor
+ if (!suspendOnly && taskExecutor != null) {
+ long activeCount = activeTasks.sum();
+ if (activeCount > 0) {
+ LOG.debug("Waiting for {} active tasks to complete.",
activeCount);
+ long timeout =
getEndpoint().getCamelContext().getShutdownStrategy().getTimeout();
+ TimeUnit timeUnit =
getEndpoint().getCamelContext().getShutdownStrategy().getTimeUnit();
+ try {
+ taskExecutor.shutdown();
+ if (!taskExecutor.awaitTermination(timeout, timeUnit)) {
+ LOG.warn("Timeout waiting for {} active tasks to
complete during shutdown.", activeTasks.sum());
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
/**
* Returns the current number of active processing tasks.
*/
diff --git
a/core/camel-core/src/test/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumerTest.java
b/core/camel-core/src/test/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumerTest.java
index b0f6e8c3890d..b3cf9e5af948 100644
---
a/core/camel-core/src/test/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumerTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumerTest.java
@@ -19,13 +19,11 @@ package org.apache.camel.component.seda;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
* Test for the virtualThreadPerTask mode of SEDA consumer
*/
-@Disabled
public class ThreadPerTaskSedaConsumerTest extends ContextTestSupport {
@Test
@@ -71,12 +69,15 @@ public class ThreadPerTaskSedaConsumerTest extends
ContextTestSupport {
@Override
public void configure() {
from("seda:test?virtualThreadPerTask=true")
+ .to("log:result")
.to("mock:result");
from("seda:limited?virtualThreadPerTask=true&concurrentConsumers=2")
+ .to("log:limited")
.to("mock:limited");
from("seda:throughput?virtualThreadPerTask=true")
+ .to("log:throughput")
.to("mock:throughput");
}
};
diff --git
a/core/camel-management/src/test/java/org/apache/camel/management/ManagedVirtualThreadExecutorTest.java
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedVirtualThreadExecutorTest.java
index e2004883f302..e6e4d500689d 100644
---
a/core/camel-management/src/test/java/org/apache/camel/management/ManagedVirtualThreadExecutorTest.java
+++
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedVirtualThreadExecutorTest.java
@@ -25,18 +25,13 @@ import javax.management.ObjectName;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.spi.LifecycleStrategy;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.EnabledForJreRange;
-import org.junit.jupiter.api.condition.JRE;
import static
org.apache.camel.management.DefaultManagementObjectNameStrategy.TYPE_THREAD_POOL;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
-@Disabled
-@EnabledForJreRange(min = JRE.JAVA_21)
public class ManagedVirtualThreadExecutorTest extends ManagementTestSupport {
private ExecutorService vte;