This is an automated email from the ASF dual-hosted git repository.

fmariani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 3b1a2770cba1 CAMEL-22957: Support virtual threads
3b1a2770cba1 is described below

commit 3b1a2770cba1a47d8a3ffbc0a8ee0e8dc0d52cc5
Author: Croway <[email protected]>
AuthorDate: Wed Feb 4 15:22:25 2026 +0100

    CAMEL-22957: Support virtual threads
---
 .../camel/component/docling/DoclingProducer.java   | 17 ++--------
 .../docling/integration/OcrExtractionIT.java       |  3 +-
 .../atmosphere/websocket/WebsocketProducer.java    | 19 +++++++++--
 ...sk.java => EventHubsCheckpointUpdaterTask.java} | 23 +++++++++++---
 .../azure/eventhubs/EventHubsConsumer.java         | 37 +++++++++++++++-------
 ...ava => EventHubsCheckpointUpdaterTaskTest.java} | 27 +++++++++++++---
 .../component/mllp/MllpTcpServerConsumer.java      | 30 ++++++++++--------
 .../component/salesforce/SalesforceHttpClient.java | 13 ++++++--
 .../org/apache/camel/component/smpp/SmppUtils.java |  3 +-
 .../group/DefaultGroupFactoryStrategy.java         |  4 +--
 .../group/DefaultManagedGroupFactory.java          | 30 +++++++++++++++---
 .../group/internal/ZooKeeperGroup.java             | 25 ++++++++++++---
 .../group/internal/ZooKeeperGroupFactory.java      |  5 +++
 .../group/internal/ZooKeeperMultiGroup.java        | 10 ++++++
 14 files changed, 180 insertions(+), 66 deletions(-)

diff --git 
a/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java
 
b/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java
index 7f8ad1a455c5..f32943cef3d1 100644
--- 
a/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java
+++ 
b/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java
@@ -34,7 +34,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -566,7 +565,8 @@ public class DoclingProducer extends DefaultProducer {
         BatchProcessingResults results = new BatchProcessingResults();
         results.setStartTimeMs(System.currentTimeMillis());
 
-        ExecutorService executor = Executors.newFixedThreadPool(parallelism);
+        ExecutorService executor = 
getEndpoint().getCamelContext().getExecutorServiceManager()
+                .newFixedThreadPool(this, "DoclingBatch", parallelism);
         AtomicInteger index = new AtomicInteger(0);
         AtomicBoolean shouldCancel = new AtomicBoolean(false);
 
@@ -670,18 +670,7 @@ public class DoclingProducer extends DefaultProducer {
             }
 
         } finally {
-            executor.shutdown();
-            try {
-                // Allow 10 seconds grace period for executor shutdown
-                long shutdownTimeout = Math.max(10000, batchTimeout / 10);
-                if (!executor.awaitTermination(shutdownTimeout, 
TimeUnit.MILLISECONDS)) {
-                    LOG.warn("Executor did not terminate within {}ms, forcing 
shutdown", shutdownTimeout);
-                    executor.shutdownNow();
-                }
-            } catch (InterruptedException e) {
-                executor.shutdownNow();
-                Thread.currentThread().interrupt();
-            }
+            
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executor);
         }
 
         results.setEndTimeMs(System.currentTimeMillis());
diff --git 
a/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/integration/OcrExtractionIT.java
 
b/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/integration/OcrExtractionIT.java
index 59613b16b3c3..7d33075c2a9a 100644
--- 
a/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/integration/OcrExtractionIT.java
+++ 
b/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/integration/OcrExtractionIT.java
@@ -126,7 +126,8 @@ class OcrExtractionIT extends CamelTestSupport {
         String result = template.requestBody("direct:ocr-convert-json", 
testImage.toString(), String.class);
 
         
assertThatJson(result).node("schema_name").asString().isEqualTo("DoclingDocument");
-        assertThatJson(result).inPath("texts[*].text").isArray().contains("OCR 
Test Document");
+        // OCR may combine adjacent lines - check for text containing the 
expected phrase
+        assertThat(result).contains(TEST_TEXT_LINE3);
 
         checkExtractedText(result);
 
diff --git 
a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketProducer.java
 
b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketProducer.java
index 18048034a8f7..637d24de84b7 100644
--- 
a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketProducer.java
+++ 
b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketProducer.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -37,12 +36,28 @@ import org.slf4j.LoggerFactory;
 public class WebsocketProducer extends DefaultProducer {
     private static final transient Logger LOG = 
LoggerFactory.getLogger(WebsocketProducer.class);
 
-    private static ExecutorService executor = 
Executors.newSingleThreadExecutor();
+    private ExecutorService executor;
 
     public WebsocketProducer(WebsocketEndpoint endpoint) {
         super(endpoint);
     }
 
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        executor = getEndpoint().getCamelContext().getExecutorServiceManager()
+                .newSingleThreadExecutor(this, "WebsocketProducer");
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (executor != null) {
+            
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executor);
+            executor = null;
+        }
+        super.doStop();
+    }
+
     @Override
     public WebsocketEndpoint getEndpoint() {
         return (WebsocketEndpoint) super.getEndpoint();
diff --git 
a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTask.java
 
b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTask.java
similarity index 77%
rename from 
components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTask.java
rename to 
components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTask.java
index 4b3ab42ab704..49723ac7c4f7 100644
--- 
a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTask.java
+++ 
b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTask.java
@@ -16,21 +16,24 @@
  */
 package org.apache.camel.component.azure.eventhubs;
 
-import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.azure.messaging.eventhubs.models.EventContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class EventHubsCheckpointUpdaterTimerTask extends TimerTask {
+/**
+ * A task for updating Azure Event Hubs checkpoints using 
ScheduledExecutorService.
+ */
+public class EventHubsCheckpointUpdaterTask implements Runnable {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(EventHubsCheckpointUpdaterTimerTask.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(EventHubsCheckpointUpdaterTask.class);
 
     private EventContext eventContext;
     private final AtomicInteger processedEvents;
+    private volatile long scheduledTime;
 
-    public EventHubsCheckpointUpdaterTimerTask(EventContext eventContext, 
AtomicInteger processedEvents) {
+    public EventHubsCheckpointUpdaterTask(EventContext eventContext, 
AtomicInteger processedEvents) {
         this.eventContext = eventContext;
         this.processedEvents = processedEvents;
     }
@@ -54,4 +57,16 @@ public class EventHubsCheckpointUpdaterTimerTask extends 
TimerTask {
     public void setEventContext(EventContext eventContext) {
         this.eventContext = eventContext;
     }
+
+    public void setScheduledTime(long scheduledTime) {
+        this.scheduledTime = scheduledTime;
+    }
+
+    public long getScheduledTime() {
+        return scheduledTime;
+    }
+
+    public boolean isExpired() {
+        return System.currentTimeMillis() > scheduledTime;
+    }
 }
diff --git 
a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
 
b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
index af425ddee6cc..58dc787dd88c 100644
--- 
a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
+++ 
b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
@@ -16,7 +16,9 @@
  */
 package org.apache.camel.component.azure.eventhubs;
 
-import java.util.Timer;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.azure.messaging.eventhubs.EventProcessorClient;
@@ -43,21 +45,24 @@ public class EventHubsConsumer extends DefaultConsumer {
     private EventProcessorClient processorClient;
 
     private final AtomicInteger processedEvents;
-    private final Timer timer;
-
-    private EventHubsCheckpointUpdaterTimerTask lastTask;
+    private ScheduledExecutorService scheduledExecutorService;
+    private ScheduledFuture<?> lastScheduledTask;
+    private EventHubsCheckpointUpdaterTask lastTask;
 
     public EventHubsConsumer(final EventHubsEndpoint endpoint, final Processor 
processor) {
         super(endpoint, processor);
 
         this.processedEvents = new AtomicInteger();
-        this.timer = new Timer();
     }
 
     @Override
     protected void doStart() throws Exception {
         super.doStart();
 
+        // create scheduled executor for checkpoint updates
+        scheduledExecutorService = 
getEndpoint().getCamelContext().getExecutorServiceManager()
+                .newScheduledThreadPool(this, "EventHubsCheckpoint", 1);
+
         // create the client
         processorClient = 
EventHubsClientFactory.createEventProcessorClient(getConfiguration(),
                 this::onEventListener, this::onErrorListener);
@@ -74,6 +79,12 @@ public class EventHubsConsumer extends DefaultConsumer {
             processorClient = null;
         }
 
+        // shutdown scheduled executor
+        if (scheduledExecutorService != null) {
+            
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(scheduledExecutorService);
+            scheduledExecutorService = null;
+        }
+
         // shutdown camel consumer
         super.doStop();
     }
@@ -159,10 +170,12 @@ public class EventHubsConsumer extends DefaultConsumer {
      * @param exchange the exchange
      */
     private void processCommit(final Exchange exchange, final EventContext 
eventContext) {
-        if (lastTask == null || System.currentTimeMillis() > 
lastTask.scheduledExecutionTime()) {
-            lastTask = new EventHubsCheckpointUpdaterTimerTask(eventContext, 
processedEvents);
+        if (lastTask == null || lastTask.isExpired()) {
+            lastTask = new EventHubsCheckpointUpdaterTask(eventContext, 
processedEvents);
             // delegate the checkpoint update to a dedicated Thread
-            timer.schedule(lastTask, 
getConfiguration().getCheckpointBatchTimeout());
+            long timeout = getConfiguration().getCheckpointBatchTimeout();
+            lastTask.setScheduledTime(System.currentTimeMillis() + timeout);
+            lastScheduledTask = scheduledExecutorService.schedule(lastTask, 
timeout, TimeUnit.MILLISECONDS);
         } else {
             // updates the eventContext to use for the offset to be the most 
accurate
             lastTask.setEventContext(eventContext);
@@ -174,8 +187,8 @@ public class EventHubsConsumer extends DefaultConsumer {
                 processedEvents.set(0);
                 
exchange.getIn().setHeader(EventHubsConstants.CHECKPOINT_UPDATED_BY, 
COMPLETED_BY_SIZE);
                 LOG.debug("eventhub consumer batch size of reached");
-                if (lastTask != null) {
-                    lastTask.cancel();
+                if (lastScheduledTask != null) {
+                    lastScheduledTask.cancel(false);
                 }
                 eventContext.updateCheckpointAsync()
                         .subscribe(unused -> LOG.debug("Processed one 
event..."),
@@ -183,14 +196,14 @@ public class EventHubsConsumer extends DefaultConsumer {
                                 () -> {
                                     LOG.debug("Checkpoint updated.");
                                 });
-            } else if (System.currentTimeMillis() >= 
lastTask.scheduledExecutionTime()) {
+            } else if (lastTask != null && lastTask.isExpired()) {
                 
exchange.getIn().setHeader(EventHubsConstants.CHECKPOINT_UPDATED_BY, 
COMPLETED_BY_TIMEOUT);
                 LOG.debug("eventhub consumer batch timeout reached");
             } else {
                 LOG.debug("neither eventhub consumer batch size of {}/{} nor 
batch timeout reached yet", cnt,
                         getConfiguration().getCheckpointBatchSize());
             }
-            // we assume that the timer task has done the update by its side
+            // we assume that the scheduled task has done the update by its 
side
         } catch (Exception ex) {
             getExceptionHandler().handleException("Error occurred during 
updating the checkpoint. This exception is ignored.",
                     exchange, ex);
diff --git 
a/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTaskTest.java
 
b/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTaskTest.java
similarity index 66%
rename from 
components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTaskTest.java
rename to 
components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTaskTest.java
index e98c949c1e00..c7f6519fa836 100644
--- 
a/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTaskTest.java
+++ 
b/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTaskTest.java
@@ -24,8 +24,10 @@ import org.mockito.Mockito;
 import reactor.core.publisher.Mono;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
-class EventHubsCheckpointUpdaterTimerTaskTest {
+class EventHubsCheckpointUpdaterTaskTest {
 
     @Test
     void testProcessedEventsResetWhenCheckpointUpdated() {
@@ -35,9 +37,9 @@ class EventHubsCheckpointUpdaterTimerTaskTest {
         Mockito.when(eventContext.updateCheckpointAsync())
                 .thenReturn(Mono.just("").then());
 
-        var timerTask = new EventHubsCheckpointUpdaterTimerTask(eventContext, 
processedEvents);
+        var task = new EventHubsCheckpointUpdaterTask(eventContext, 
processedEvents);
 
-        timerTask.run();
+        task.run();
 
         assertEquals(0, processedEvents.get());
     }
@@ -50,11 +52,26 @@ class EventHubsCheckpointUpdaterTimerTaskTest {
         Mockito.when(eventContext.updateCheckpointAsync())
                 .thenReturn(Mono.error(new RuntimeException()));
 
-        var timerTask = new EventHubsCheckpointUpdaterTimerTask(eventContext, 
processedEvents);
+        var task = new EventHubsCheckpointUpdaterTask(eventContext, 
processedEvents);
 
-        timerTask.run();
+        task.run();
 
         assertEquals(1, processedEvents.get());
     }
 
+    @Test
+    void testIsExpired() {
+        var processedEvents = new AtomicInteger(0);
+        var eventContext = Mockito.mock(EventContext.class);
+        var task = new EventHubsCheckpointUpdaterTask(eventContext, 
processedEvents);
+
+        // Set scheduled time in the past
+        task.setScheduledTime(System.currentTimeMillis() - 1000);
+        assertTrue(task.isExpired());
+
+        // Set scheduled time in the future
+        task.setScheduledTime(System.currentTimeMillis() + 10000);
+        assertFalse(task.isExpired());
+    }
+
 }
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
index 8ab1e9884137..1c94496b65b2 100644
--- 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
@@ -28,11 +28,7 @@ import java.security.cert.X509Certificate;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
 import javax.net.ssl.SSLPeerUnverifiedException;
@@ -66,12 +62,12 @@ import org.slf4j.LoggerFactory;
 @ManagedResource(description = "MLLP Producer")
 public class MllpTcpServerConsumer extends DefaultConsumer {
     final Logger log;
-    final ExecutorService validationExecutor;
-    final ExecutorService consumerExecutor;
     final Charset charset;
     final Hl7Util hl7Util;
     final boolean logPhi;
 
+    ExecutorService validationExecutor;
+    ExecutorService consumerExecutor;
     TcpServerBindThread bindThread;
     TcpServerAcceptThread acceptThread;
 
@@ -84,11 +80,6 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
         MllpComponent component = endpoint.getComponent();
         this.logPhi = component.getLogPhi();
         hl7Util = new Hl7Util(component.getLogPhiMaxBytes(), logPhi);
-
-        validationExecutor = Executors.newCachedThreadPool();
-        consumerExecutor = new ThreadPoolExecutor(
-                1, getConfiguration().getMaxConcurrentConsumers(), 
getConfiguration().getAcceptTimeout(), TimeUnit.MILLISECONDS,
-                new SynchronousQueue<>());
     }
 
     @Override
@@ -158,6 +149,13 @@ public class MllpTcpServerConsumer extends DefaultConsumer 
{
 
     @Override
     protected void doStart() throws Exception {
+        // Create executor services using Camel's ExecutorServiceManager for 
virtual threads support
+        validationExecutor = 
getEndpoint().getCamelContext().getExecutorServiceManager()
+                .newCachedThreadPool(this, "MllpValidation");
+        consumerExecutor = 
getEndpoint().getCamelContext().getExecutorServiceManager()
+                .newThreadPool(this, "MllpConsumer",
+                        1, getConfiguration().getMaxConcurrentConsumers());
+
         if (bindThread == null || !bindThread.isAlive()) {
             bindThread = new TcpServerBindThread(this, 
getEndpoint().getSslContextParameters());
 
@@ -180,11 +178,17 @@ public class MllpTcpServerConsumer extends 
DefaultConsumer {
     @Override
     protected void doShutdown() throws Exception {
         super.doShutdown();
-        consumerExecutor.shutdownNow();
+        if (consumerExecutor != null) {
+            
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(consumerExecutor);
+            consumerExecutor = null;
+        }
         if (acceptThread != null) {
             acceptThread.interrupt();
         }
-        validationExecutor.shutdownNow();
+        if (validationExecutor != null) {
+            
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(validationExecutor);
+            validationExecutor = null;
+        }
     }
 
     public void handleMessageTimeout(String message, byte[] payload, Throwable 
cause) {
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java
index e4f37fb54418..53c13d9e7037 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java
@@ -20,7 +20,6 @@ import java.lang.reflect.Method;
 import java.net.URI;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelContext;
@@ -57,12 +56,20 @@ public class SalesforceHttpClient extends HttpClient {
 
     private final ExecutorService workerPool;
 
+    /**
+     * Creates a SalesforceHttpClient without a worker pool. This constructor 
is intended for testing purposes only.
+     * Production code should use the constructor that accepts a CamelContext 
and ExecutorService.
+     */
     public SalesforceHttpClient() {
-        this(null);
+        this(null, null, null);
     }
 
+    /**
+     * Creates a SalesforceHttpClient without a worker pool. This constructor 
is intended for testing purposes only.
+     * Production code should use the constructor that accepts a CamelContext 
and ExecutorService.
+     */
     public SalesforceHttpClient(SslContextFactory.Client sslContextFactory) {
-        this(null, Executors.newCachedThreadPool(), sslContextFactory);
+        this(null, null, sslContextFactory);
     }
 
     public SalesforceHttpClient(CamelContext context, ExecutorService 
workerPool, SslContextFactory.Client sslContextFactory) {
diff --git 
a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
 
b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
index f8ec4c298133..8276d826064f 100644
--- 
a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
+++ 
b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
@@ -315,7 +315,8 @@ public final class SmppUtils {
             ExecutorServiceManager manager = 
endpoint.getCamelContext().getExecutorServiceManager();
             return manager.newSingleThreadScheduledExecutor(service, taskName);
         } else {
-            LOG.warn("Not using the Camel scheduled thread executor");
+            LOG.warn("CamelContext or ExecutorServiceManager not available. "
+                     + "Using fallback executor which does not support virtual 
threads.");
             return Executors.newSingleThreadScheduledExecutor();
         }
     }
diff --git 
a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/DefaultGroupFactoryStrategy.java
 
b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/DefaultGroupFactoryStrategy.java
index 81d18585cdba..a82572f93f87 100644
--- 
a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/DefaultGroupFactoryStrategy.java
+++ 
b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/DefaultGroupFactoryStrategy.java
@@ -33,9 +33,9 @@ public class DefaultGroupFactoryStrategy implements 
ManagedGroupFactoryStrategy
             throws Exception {
 
         if (curator != null) {
-            return new DefaultManagedGroupFactory(curator, false);
+            return new DefaultManagedGroupFactory(curator, false, 
camelContext);
         } else {
-            return new DefaultManagedGroupFactory(factory.call(), true);
+            return new DefaultManagedGroupFactory(factory.call(), true, 
camelContext);
         }
     }
 
diff --git 
a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/DefaultManagedGroupFactory.java
 
b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/DefaultManagedGroupFactory.java
index 12a4beb645b8..43d1a18edd4a 100644
--- 
a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/DefaultManagedGroupFactory.java
+++ 
b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/DefaultManagedGroupFactory.java
@@ -16,8 +16,10 @@
  */
 package org.apache.camel.component.zookeepermaster.group;
 
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
 
+import org.apache.camel.CamelContext;
 import 
org.apache.camel.component.zookeepermaster.group.internal.ZooKeeperGroup;
 import 
org.apache.camel.component.zookeepermaster.group.internal.ZooKeeperMultiGroup;
 import org.apache.curator.framework.CuratorFramework;
@@ -26,10 +28,16 @@ public class DefaultManagedGroupFactory implements 
ManagedGroupFactory {
 
     private final CuratorFramework curator;
     private final boolean shouldClose;
+    private final CamelContext camelContext;
 
     public DefaultManagedGroupFactory(CuratorFramework curator, boolean 
shouldClose) {
+        this(curator, shouldClose, null);
+    }
+
+    public DefaultManagedGroupFactory(CuratorFramework curator, boolean 
shouldClose, CamelContext camelContext) {
         this.curator = curator;
         this.shouldClose = shouldClose;
+        this.camelContext = camelContext;
     }
 
     @Override
@@ -39,22 +47,36 @@ public class DefaultManagedGroupFactory implements 
ManagedGroupFactory {
 
     @Override
     public <T extends NodeState> Group<T> createGroup(String path, Class<T> 
clazz) {
-        return new ZooKeeperGroup<>(curator, path, clazz);
+        ExecutorService executorService = createExecutorService(path);
+        return new ZooKeeperGroup<>(curator, path, clazz, executorService);
     }
 
     @Override
     public <T extends NodeState> Group<T> createGroup(String path, Class<T> 
clazz, ThreadFactory threadFactory) {
-        return new ZooKeeperGroup<>(curator, path, clazz, threadFactory);
+        ExecutorService executorService = createExecutorService(path);
+        return new ZooKeeperGroup<>(curator, path, clazz, executorService);
     }
 
     @Override
     public <T extends NodeState> Group<T> createMultiGroup(String path, 
Class<T> clazz) {
-        return new ZooKeeperMultiGroup<>(curator, path, clazz);
+        ExecutorService executorService = createExecutorService(path);
+        return new ZooKeeperMultiGroup<>(curator, path, clazz, 
executorService);
     }
 
     @Override
     public <T extends NodeState> Group<T> createMultiGroup(String path, 
Class<T> clazz, ThreadFactory threadFactory) {
-        return new ZooKeeperMultiGroup<>(curator, path, clazz, threadFactory);
+        ExecutorService executorService = createExecutorService(path);
+        return new ZooKeeperMultiGroup<>(curator, path, clazz, 
executorService);
+    }
+
+    private ExecutorService createExecutorService(String path) {
+        if (camelContext == null) {
+            throw new IllegalStateException(
+                    "CamelContext is required to create ExecutorService for 
virtual threads support. "
+                                            + "Use the constructor that 
accepts CamelContext.");
+        }
+        return camelContext.getExecutorServiceManager()
+                .newSingleThreadExecutor(this, "ZooKeeperGroup-" + path);
     }
 
     @Override
diff --git 
a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ZooKeeperGroup.java
 
b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ZooKeeperGroup.java
index 8fd265340339..d251114fc106 100644
--- 
a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ZooKeeperGroup.java
+++ 
b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ZooKeeperGroup.java
@@ -120,20 +120,30 @@ public class ZooKeeperGroup<T extends NodeState> 
implements Group<T> {
             = (CuratorFramework client, ConnectionState newState) -> 
handleStateChange(newState);
 
     /**
-     * @param client the client
-     * @param path   path to watch
+     * @param      client the client
+     * @param      path   path to watch
+     * @deprecated        Use the constructor that accepts an ExecutorService 
from ExecutorServiceManager for virtual
+     *                    threads support.
      */
+    @Deprecated
     public ZooKeeperGroup(CuratorFramework client, String path, Class<T> 
clazz) {
         this(client, path, clazz, Executors.newSingleThreadExecutor());
+        LOG.warn("Using deprecated ZooKeeperGroup constructor without 
ExecutorService. "
+                 + "This does not support virtual threads. Use 
ManagedGroupFactory.createGroup() instead.");
     }
 
     /**
-     * @param client        the client
-     * @param path          path to watch
-     * @param threadFactory factory to use when creating internal threads
+     * @param      client        the client
+     * @param      path          path to watch
+     * @param      threadFactory factory to use when creating internal threads
+     * @deprecated               Use the constructor that accepts an 
ExecutorService from ExecutorServiceManager for
+     *                           virtual threads support.
      */
+    @Deprecated
     public ZooKeeperGroup(CuratorFramework client, String path, Class<T> 
clazz, ThreadFactory threadFactory) {
         this(client, path, clazz, 
Executors.newSingleThreadExecutor(threadFactory));
+        LOG.warn("Using deprecated ZooKeeperGroup constructor with 
ThreadFactory. "
+                 + "This does not support virtual threads. Use 
ManagedGroupFactory.createGroup() instead.");
     }
 
     /**
@@ -143,6 +153,11 @@ public class ZooKeeperGroup<T extends NodeState> 
implements Group<T> {
      */
     public ZooKeeperGroup(CuratorFramework client, String path, Class<T> 
clazz, final ExecutorService executorService) {
         LOG.info("Creating ZK Group for path \"{}\"", path);
+        if (executorService == null) {
+            throw new IllegalArgumentException(
+                    "ExecutorService must not be null. "
+                                               + "Use ExecutorServiceManager 
to create an ExecutorService for virtual threads support.");
+        }
         this.client = client;
         this.path = path;
         this.clazz = clazz;
diff --git 
a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ZooKeeperGroupFactory.java
 
b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ZooKeeperGroupFactory.java
index 5d2953e31e94..518b4e27566c 100644
--- 
a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ZooKeeperGroupFactory.java
+++ 
b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ZooKeeperGroupFactory.java
@@ -23,6 +23,11 @@ import 
org.apache.camel.component.zookeepermaster.group.GroupFactory;
 import org.apache.camel.component.zookeepermaster.group.NodeState;
 import org.apache.curator.framework.CuratorFramework;
 
+/**
+ * @deprecated Use {@link 
org.apache.camel.component.zookeepermaster.group.DefaultManagedGroupFactory} 
which supports
+ *             virtual threads via ExecutorServiceManager.
+ */
+@Deprecated
 public class ZooKeeperGroupFactory implements GroupFactory {
 
     private CuratorFramework curator;
diff --git 
a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ZooKeeperMultiGroup.java
 
b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ZooKeeperMultiGroup.java
index ce3555c4a9d0..4dd18bd43613 100644
--- 
a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ZooKeeperMultiGroup.java
+++ 
b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ZooKeeperMultiGroup.java
@@ -27,6 +27,11 @@ import org.apache.curator.framework.CuratorFramework;
 
 public class ZooKeeperMultiGroup<T extends NodeState> extends 
ZooKeeperGroup<T> implements MultiGroup<T> {
 
+    /**
+     * @deprecated Use the constructor that accepts an ExecutorService from 
ExecutorServiceManager for virtual threads
+     *             support.
+     */
+    @Deprecated
     public ZooKeeperMultiGroup(CuratorFramework client, String path, Class<T> 
clazz) {
         super(client, path, clazz);
     }
@@ -35,6 +40,11 @@ public class ZooKeeperMultiGroup<T extends NodeState> 
extends ZooKeeperGroup<T>
         super(client, path, clazz, executorService);
     }
 
+    /**
+     * @deprecated Use the constructor that accepts an ExecutorService from 
ExecutorServiceManager for virtual threads
+     *             support.
+     */
+    @Deprecated
     public ZooKeeperMultiGroup(CuratorFramework client, String path, Class<T> 
clazz, ThreadFactory threadFactory) {
         super(client, path, clazz, threadFactory);
     }

Reply via email to