This is an automated email from the ASF dual-hosted git repository.
fmariani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 3b1a2770cba1 CAMEL-22957: Support virtual threads
3b1a2770cba1 is described below
commit 3b1a2770cba1a47d8a3ffbc0a8ee0e8dc0d52cc5
Author: Croway <[email protected]>
AuthorDate: Wed Feb 4 15:22:25 2026 +0100
CAMEL-22957: Support virtual threads
---
.../camel/component/docling/DoclingProducer.java | 17 ++--------
.../docling/integration/OcrExtractionIT.java | 3 +-
.../atmosphere/websocket/WebsocketProducer.java | 19 +++++++++--
...sk.java => EventHubsCheckpointUpdaterTask.java} | 23 +++++++++++---
.../azure/eventhubs/EventHubsConsumer.java | 37 +++++++++++++++-------
...ava => EventHubsCheckpointUpdaterTaskTest.java} | 27 +++++++++++++---
.../component/mllp/MllpTcpServerConsumer.java | 30 ++++++++++--------
.../component/salesforce/SalesforceHttpClient.java | 13 ++++++--
.../org/apache/camel/component/smpp/SmppUtils.java | 3 +-
.../group/DefaultGroupFactoryStrategy.java | 4 +--
.../group/DefaultManagedGroupFactory.java | 30 +++++++++++++++---
.../group/internal/ZooKeeperGroup.java | 25 ++++++++++++---
.../group/internal/ZooKeeperGroupFactory.java | 5 +++
.../group/internal/ZooKeeperMultiGroup.java | 10 ++++++
14 files changed, 180 insertions(+), 66 deletions(-)
diff --git
a/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java
b/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java
index 7f8ad1a455c5..f32943cef3d1 100644
---
a/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java
+++
b/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java
@@ -34,7 +34,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -566,7 +565,8 @@ public class DoclingProducer extends DefaultProducer {
BatchProcessingResults results = new BatchProcessingResults();
results.setStartTimeMs(System.currentTimeMillis());
- ExecutorService executor = Executors.newFixedThreadPool(parallelism);
+ ExecutorService executor =
getEndpoint().getCamelContext().getExecutorServiceManager()
+ .newFixedThreadPool(this, "DoclingBatch", parallelism);
AtomicInteger index = new AtomicInteger(0);
AtomicBoolean shouldCancel = new AtomicBoolean(false);
@@ -670,18 +670,7 @@ public class DoclingProducer extends DefaultProducer {
}
} finally {
- executor.shutdown();
- try {
- // Allow 10 seconds grace period for executor shutdown
- long shutdownTimeout = Math.max(10000, batchTimeout / 10);
- if (!executor.awaitTermination(shutdownTimeout,
TimeUnit.MILLISECONDS)) {
- LOG.warn("Executor did not terminate within {}ms, forcing
shutdown", shutdownTimeout);
- executor.shutdownNow();
- }
- } catch (InterruptedException e) {
- executor.shutdownNow();
- Thread.currentThread().interrupt();
- }
+
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executor);
}
results.setEndTimeMs(System.currentTimeMillis());
diff --git
a/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/integration/OcrExtractionIT.java
b/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/integration/OcrExtractionIT.java
index 59613b16b3c3..7d33075c2a9a 100644
---
a/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/integration/OcrExtractionIT.java
+++
b/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/integration/OcrExtractionIT.java
@@ -126,7 +126,8 @@ class OcrExtractionIT extends CamelTestSupport {
String result = template.requestBody("direct:ocr-convert-json",
testImage.toString(), String.class);
assertThatJson(result).node("schema_name").asString().isEqualTo("DoclingDocument");
- assertThatJson(result).inPath("texts[*].text").isArray().contains("OCR
Test Document");
+ // OCR may combine adjacent lines - check for text containing the
expected phrase
+ assertThat(result).contains(TEST_TEXT_LINE3);
checkExtractedText(result);
diff --git
a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketProducer.java
b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketProducer.java
index 18048034a8f7..637d24de84b7 100644
---
a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketProducer.java
+++
b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketProducer.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
@@ -37,12 +36,28 @@ import org.slf4j.LoggerFactory;
public class WebsocketProducer extends DefaultProducer {
private static final transient Logger LOG =
LoggerFactory.getLogger(WebsocketProducer.class);
- private static ExecutorService executor =
Executors.newSingleThreadExecutor();
+ private ExecutorService executor;
public WebsocketProducer(WebsocketEndpoint endpoint) {
super(endpoint);
}
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ executor = getEndpoint().getCamelContext().getExecutorServiceManager()
+ .newSingleThreadExecutor(this, "WebsocketProducer");
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (executor != null) {
+
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executor);
+ executor = null;
+ }
+ super.doStop();
+ }
+
@Override
public WebsocketEndpoint getEndpoint() {
return (WebsocketEndpoint) super.getEndpoint();
diff --git
a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTask.java
b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTask.java
similarity index 77%
rename from
components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTask.java
rename to
components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTask.java
index 4b3ab42ab704..49723ac7c4f7 100644
---
a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTask.java
+++
b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTask.java
@@ -16,21 +16,24 @@
*/
package org.apache.camel.component.azure.eventhubs;
-import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;
import com.azure.messaging.eventhubs.models.EventContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class EventHubsCheckpointUpdaterTimerTask extends TimerTask {
+/**
+ * A task for updating Azure Event Hubs checkpoints using
ScheduledExecutorService.
+ */
+public class EventHubsCheckpointUpdaterTask implements Runnable {
- private static final Logger LOG =
LoggerFactory.getLogger(EventHubsCheckpointUpdaterTimerTask.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(EventHubsCheckpointUpdaterTask.class);
private EventContext eventContext;
private final AtomicInteger processedEvents;
+ private volatile long scheduledTime;
- public EventHubsCheckpointUpdaterTimerTask(EventContext eventContext,
AtomicInteger processedEvents) {
+ public EventHubsCheckpointUpdaterTask(EventContext eventContext,
AtomicInteger processedEvents) {
this.eventContext = eventContext;
this.processedEvents = processedEvents;
}
@@ -54,4 +57,16 @@ public class EventHubsCheckpointUpdaterTimerTask extends
TimerTask {
public void setEventContext(EventContext eventContext) {
this.eventContext = eventContext;
}
+
+ public void setScheduledTime(long scheduledTime) {
+ this.scheduledTime = scheduledTime;
+ }
+
+ public long getScheduledTime() {
+ return scheduledTime;
+ }
+
+ public boolean isExpired() {
+ return System.currentTimeMillis() > scheduledTime;
+ }
}
diff --git
a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
index af425ddee6cc..58dc787dd88c 100644
---
a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
+++
b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
@@ -16,7 +16,9 @@
*/
package org.apache.camel.component.azure.eventhubs;
-import java.util.Timer;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.azure.messaging.eventhubs.EventProcessorClient;
@@ -43,21 +45,24 @@ public class EventHubsConsumer extends DefaultConsumer {
private EventProcessorClient processorClient;
private final AtomicInteger processedEvents;
- private final Timer timer;
-
- private EventHubsCheckpointUpdaterTimerTask lastTask;
+ private ScheduledExecutorService scheduledExecutorService;
+ private ScheduledFuture<?> lastScheduledTask;
+ private EventHubsCheckpointUpdaterTask lastTask;
public EventHubsConsumer(final EventHubsEndpoint endpoint, final Processor
processor) {
super(endpoint, processor);
this.processedEvents = new AtomicInteger();
- this.timer = new Timer();
}
@Override
protected void doStart() throws Exception {
super.doStart();
+ // create scheduled executor for checkpoint updates
+ scheduledExecutorService =
getEndpoint().getCamelContext().getExecutorServiceManager()
+ .newScheduledThreadPool(this, "EventHubsCheckpoint", 1);
+
// create the client
processorClient =
EventHubsClientFactory.createEventProcessorClient(getConfiguration(),
this::onEventListener, this::onErrorListener);
@@ -74,6 +79,12 @@ public class EventHubsConsumer extends DefaultConsumer {
processorClient = null;
}
+ // shutdown scheduled executor
+ if (scheduledExecutorService != null) {
+
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(scheduledExecutorService);
+ scheduledExecutorService = null;
+ }
+
// shutdown camel consumer
super.doStop();
}
@@ -159,10 +170,12 @@ public class EventHubsConsumer extends DefaultConsumer {
* @param exchange the exchange
*/
private void processCommit(final Exchange exchange, final EventContext
eventContext) {
- if (lastTask == null || System.currentTimeMillis() >
lastTask.scheduledExecutionTime()) {
- lastTask = new EventHubsCheckpointUpdaterTimerTask(eventContext,
processedEvents);
+ if (lastTask == null || lastTask.isExpired()) {
+ lastTask = new EventHubsCheckpointUpdaterTask(eventContext,
processedEvents);
// delegate the checkpoint update to a dedicated Thread
- timer.schedule(lastTask,
getConfiguration().getCheckpointBatchTimeout());
+ long timeout = getConfiguration().getCheckpointBatchTimeout();
+ lastTask.setScheduledTime(System.currentTimeMillis() + timeout);
+ lastScheduledTask = scheduledExecutorService.schedule(lastTask,
timeout, TimeUnit.MILLISECONDS);
} else {
// updates the eventContext to use for the offset to be the most
accurate
lastTask.setEventContext(eventContext);
@@ -174,8 +187,8 @@ public class EventHubsConsumer extends DefaultConsumer {
processedEvents.set(0);
exchange.getIn().setHeader(EventHubsConstants.CHECKPOINT_UPDATED_BY,
COMPLETED_BY_SIZE);
LOG.debug("eventhub consumer batch size of reached");
- if (lastTask != null) {
- lastTask.cancel();
+ if (lastScheduledTask != null) {
+ lastScheduledTask.cancel(false);
}
eventContext.updateCheckpointAsync()
.subscribe(unused -> LOG.debug("Processed one
event..."),
@@ -183,14 +196,14 @@ public class EventHubsConsumer extends DefaultConsumer {
() -> {
LOG.debug("Checkpoint updated.");
});
- } else if (System.currentTimeMillis() >=
lastTask.scheduledExecutionTime()) {
+ } else if (lastTask != null && lastTask.isExpired()) {
exchange.getIn().setHeader(EventHubsConstants.CHECKPOINT_UPDATED_BY,
COMPLETED_BY_TIMEOUT);
LOG.debug("eventhub consumer batch timeout reached");
} else {
LOG.debug("neither eventhub consumer batch size of {}/{} nor
batch timeout reached yet", cnt,
getConfiguration().getCheckpointBatchSize());
}
- // we assume that the timer task has done the update by its side
+ // we assume that the scheduled task has done the update by its
side
} catch (Exception ex) {
getExceptionHandler().handleException("Error occurred during
updating the checkpoint. This exception is ignored.",
exchange, ex);
diff --git
a/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTaskTest.java
b/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTaskTest.java
similarity index 66%
rename from
components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTaskTest.java
rename to
components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTaskTest.java
index e98c949c1e00..c7f6519fa836 100644
---
a/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTimerTaskTest.java
+++
b/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsCheckpointUpdaterTaskTest.java
@@ -24,8 +24,10 @@ import org.mockito.Mockito;
import reactor.core.publisher.Mono;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
-class EventHubsCheckpointUpdaterTimerTaskTest {
+class EventHubsCheckpointUpdaterTaskTest {
@Test
void testProcessedEventsResetWhenCheckpointUpdated() {
@@ -35,9 +37,9 @@ class EventHubsCheckpointUpdaterTimerTaskTest {
Mockito.when(eventContext.updateCheckpointAsync())
.thenReturn(Mono.just("").then());
- var timerTask = new EventHubsCheckpointUpdaterTimerTask(eventContext,
processedEvents);
+ var task = new EventHubsCheckpointUpdaterTask(eventContext,
processedEvents);
- timerTask.run();
+ task.run();
assertEquals(0, processedEvents.get());
}
@@ -50,11 +52,26 @@ class EventHubsCheckpointUpdaterTimerTaskTest {
Mockito.when(eventContext.updateCheckpointAsync())
.thenReturn(Mono.error(new RuntimeException()));
- var timerTask = new EventHubsCheckpointUpdaterTimerTask(eventContext,
processedEvents);
+ var task = new EventHubsCheckpointUpdaterTask(eventContext,
processedEvents);
- timerTask.run();
+ task.run();
assertEquals(1, processedEvents.get());
}
+ @Test
+ void testIsExpired() {
+ var processedEvents = new AtomicInteger(0);
+ var eventContext = Mockito.mock(EventContext.class);
+ var task = new EventHubsCheckpointUpdaterTask(eventContext,
processedEvents);
+
+ // Set scheduled time in the past
+ task.setScheduledTime(System.currentTimeMillis() - 1000);
+ assertTrue(task.isExpired());
+
+ // Set scheduled time in the future
+ task.setScheduledTime(System.currentTimeMillis() + 10000);
+ assertFalse(task.isExpired());
+ }
+
}
diff --git
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
index 8ab1e9884137..1c94496b65b2 100644
---
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
+++
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
@@ -28,11 +28,7 @@ import java.security.cert.X509Certificate;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import javax.net.ssl.SSLPeerUnverifiedException;
@@ -66,12 +62,12 @@ import org.slf4j.LoggerFactory;
@ManagedResource(description = "MLLP Producer")
public class MllpTcpServerConsumer extends DefaultConsumer {
final Logger log;
- final ExecutorService validationExecutor;
- final ExecutorService consumerExecutor;
final Charset charset;
final Hl7Util hl7Util;
final boolean logPhi;
+ ExecutorService validationExecutor;
+ ExecutorService consumerExecutor;
TcpServerBindThread bindThread;
TcpServerAcceptThread acceptThread;
@@ -84,11 +80,6 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
MllpComponent component = endpoint.getComponent();
this.logPhi = component.getLogPhi();
hl7Util = new Hl7Util(component.getLogPhiMaxBytes(), logPhi);
-
- validationExecutor = Executors.newCachedThreadPool();
- consumerExecutor = new ThreadPoolExecutor(
- 1, getConfiguration().getMaxConcurrentConsumers(),
getConfiguration().getAcceptTimeout(), TimeUnit.MILLISECONDS,
- new SynchronousQueue<>());
}
@Override
@@ -158,6 +149,13 @@ public class MllpTcpServerConsumer extends DefaultConsumer
{
@Override
protected void doStart() throws Exception {
+ // Create executor services using Camel's ExecutorServiceManager for
virtual threads support
+ validationExecutor =
getEndpoint().getCamelContext().getExecutorServiceManager()
+ .newCachedThreadPool(this, "MllpValidation");
+ consumerExecutor =
getEndpoint().getCamelContext().getExecutorServiceManager()
+ .newThreadPool(this, "MllpConsumer",
+ 1, getConfiguration().getMaxConcurrentConsumers());
+
if (bindThread == null || !bindThread.isAlive()) {
bindThread = new TcpServerBindThread(this,
getEndpoint().getSslContextParameters());
@@ -180,11 +178,17 @@ public class MllpTcpServerConsumer extends
DefaultConsumer {
@Override
protected void doShutdown() throws Exception {
super.doShutdown();
- consumerExecutor.shutdownNow();
+ if (consumerExecutor != null) {
+
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(consumerExecutor);
+ consumerExecutor = null;
+ }
if (acceptThread != null) {
acceptThread.interrupt();
}
- validationExecutor.shutdownNow();
+ if (validationExecutor != null) {
+
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(validationExecutor);
+ validationExecutor = null;
+ }
}
public void handleMessageTimeout(String message, byte[] payload, Throwable
cause) {
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java
index e4f37fb54418..53c13d9e7037 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceHttpClient.java
@@ -20,7 +20,6 @@ import java.lang.reflect.Method;
import java.net.URI;
import java.util.List;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
@@ -57,12 +56,20 @@ public class SalesforceHttpClient extends HttpClient {
private final ExecutorService workerPool;
+ /**
+ * Creates a SalesforceHttpClient without a worker pool. This constructor
is intended for testing purposes only.
+ * Production code should use the constructor that accepts a CamelContext
and ExecutorService.
+ */
public SalesforceHttpClient() {
- this(null);
+ this(null, null, null);
}
+ /**
+ * Creates a SalesforceHttpClient without a worker pool. This constructor
is intended for testing purposes only.
+ * Production code should use the constructor that accepts a CamelContext
and ExecutorService.
+ */
public SalesforceHttpClient(SslContextFactory.Client sslContextFactory) {
- this(null, Executors.newCachedThreadPool(), sslContextFactory);
+ this(null, null, sslContextFactory);
}
public SalesforceHttpClient(CamelContext context, ExecutorService
workerPool, SslContextFactory.Client sslContextFactory) {
diff --git
a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
index f8ec4c298133..8276d826064f 100644
---
a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
+++
b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
@@ -315,7 +315,8 @@ public final class SmppUtils {
ExecutorServiceManager manager =
endpoint.getCamelContext().getExecutorServiceManager();
return manager.newSingleThreadScheduledExecutor(service, taskName);
} else {
- LOG.warn("Not using the Camel scheduled thread executor");
+ LOG.warn("CamelContext or ExecutorServiceManager not available. "
+ + "Using fallback executor which does not support virtual
threads.");
return Executors.newSingleThreadScheduledExecutor();
}
}
diff --git
a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/DefaultGroupFactoryStrategy.java
b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/DefaultGroupFactoryStrategy.java
index 81d18585cdba..a82572f93f87 100644
---
a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/DefaultGroupFactoryStrategy.java
+++
b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/DefaultGroupFactoryStrategy.java
@@ -33,9 +33,9 @@ public class DefaultGroupFactoryStrategy implements
ManagedGroupFactoryStrategy
throws Exception {
if (curator != null) {
- return new DefaultManagedGroupFactory(curator, false);
+ return new DefaultManagedGroupFactory(curator, false,
camelContext);
} else {
- return new DefaultManagedGroupFactory(factory.call(), true);
+ return new DefaultManagedGroupFactory(factory.call(), true,
camelContext);
}
}
diff --git
a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/DefaultManagedGroupFactory.java
b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/DefaultManagedGroupFactory.java
index 12a4beb645b8..43d1a18edd4a 100644
---
a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/DefaultManagedGroupFactory.java
+++
b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/DefaultManagedGroupFactory.java
@@ -16,8 +16,10 @@
*/
package org.apache.camel.component.zookeepermaster.group;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
+import org.apache.camel.CamelContext;
import
org.apache.camel.component.zookeepermaster.group.internal.ZooKeeperGroup;
import
org.apache.camel.component.zookeepermaster.group.internal.ZooKeeperMultiGroup;
import org.apache.curator.framework.CuratorFramework;
@@ -26,10 +28,16 @@ public class DefaultManagedGroupFactory implements
ManagedGroupFactory {
private final CuratorFramework curator;
private final boolean shouldClose;
+ private final CamelContext camelContext;
public DefaultManagedGroupFactory(CuratorFramework curator, boolean
shouldClose) {
+ this(curator, shouldClose, null);
+ }
+
+ public DefaultManagedGroupFactory(CuratorFramework curator, boolean
shouldClose, CamelContext camelContext) {
this.curator = curator;
this.shouldClose = shouldClose;
+ this.camelContext = camelContext;
}
@Override
@@ -39,22 +47,36 @@ public class DefaultManagedGroupFactory implements
ManagedGroupFactory {
@Override
public <T extends NodeState> Group<T> createGroup(String path, Class<T>
clazz) {
- return new ZooKeeperGroup<>(curator, path, clazz);
+ ExecutorService executorService = createExecutorService(path);
+ return new ZooKeeperGroup<>(curator, path, clazz, executorService);
}
@Override
public <T extends NodeState> Group<T> createGroup(String path, Class<T>
clazz, ThreadFactory threadFactory) {
- return new ZooKeeperGroup<>(curator, path, clazz, threadFactory);
+ ExecutorService executorService = createExecutorService(path);
+ return new ZooKeeperGroup<>(curator, path, clazz, executorService);
}
@Override
public <T extends NodeState> Group<T> createMultiGroup(String path,
Class<T> clazz) {
- return new ZooKeeperMultiGroup<>(curator, path, clazz);
+ ExecutorService executorService = createExecutorService(path);
+ return new ZooKeeperMultiGroup<>(curator, path, clazz,
executorService);
}
@Override
public <T extends NodeState> Group<T> createMultiGroup(String path,
Class<T> clazz, ThreadFactory threadFactory) {
- return new ZooKeeperMultiGroup<>(curator, path, clazz, threadFactory);
+ ExecutorService executorService = createExecutorService(path);
+ return new ZooKeeperMultiGroup<>(curator, path, clazz,
executorService);
+ }
+
+ private ExecutorService createExecutorService(String path) {
+ if (camelContext == null) {
+ throw new IllegalStateException(
+ "CamelContext is required to create ExecutorService for
virtual threads support. "
+ + "Use the constructor that
accepts CamelContext.");
+ }
+ return camelContext.getExecutorServiceManager()
+ .newSingleThreadExecutor(this, "ZooKeeperGroup-" + path);
}
@Override
diff --git
a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ZooKeeperGroup.java
b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ZooKeeperGroup.java
index 8fd265340339..d251114fc106 100644
---
a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ZooKeeperGroup.java
+++
b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ZooKeeperGroup.java
@@ -120,20 +120,30 @@ public class ZooKeeperGroup<T extends NodeState>
implements Group<T> {
= (CuratorFramework client, ConnectionState newState) ->
handleStateChange(newState);
/**
- * @param client the client
- * @param path path to watch
+ * @param client the client
+ * @param path path to watch
+ * @deprecated Use the constructor that accepts an ExecutorService
from ExecutorServiceManager for virtual
+ * threads support.
*/
+ @Deprecated
public ZooKeeperGroup(CuratorFramework client, String path, Class<T>
clazz) {
this(client, path, clazz, Executors.newSingleThreadExecutor());
+ LOG.warn("Using deprecated ZooKeeperGroup constructor without
ExecutorService. "
+ + "This does not support virtual threads. Use
ManagedGroupFactory.createGroup() instead.");
}
/**
- * @param client the client
- * @param path path to watch
- * @param threadFactory factory to use when creating internal threads
+ * @param client the client
+ * @param path path to watch
+ * @param threadFactory factory to use when creating internal threads
+ * @deprecated Use the constructor that accepts an
ExecutorService from ExecutorServiceManager for
+ * virtual threads support.
*/
+ @Deprecated
public ZooKeeperGroup(CuratorFramework client, String path, Class<T>
clazz, ThreadFactory threadFactory) {
this(client, path, clazz,
Executors.newSingleThreadExecutor(threadFactory));
+ LOG.warn("Using deprecated ZooKeeperGroup constructor with
ThreadFactory. "
+ + "This does not support virtual threads. Use
ManagedGroupFactory.createGroup() instead.");
}
/**
@@ -143,6 +153,11 @@ public class ZooKeeperGroup<T extends NodeState>
implements Group<T> {
*/
public ZooKeeperGroup(CuratorFramework client, String path, Class<T>
clazz, final ExecutorService executorService) {
LOG.info("Creating ZK Group for path \"{}\"", path);
+ if (executorService == null) {
+ throw new IllegalArgumentException(
+ "ExecutorService must not be null. "
+ + "Use ExecutorServiceManager
to create an ExecutorService for virtual threads support.");
+ }
this.client = client;
this.path = path;
this.clazz = clazz;
diff --git
a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ZooKeeperGroupFactory.java
b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ZooKeeperGroupFactory.java
index 5d2953e31e94..518b4e27566c 100644
---
a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ZooKeeperGroupFactory.java
+++
b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ZooKeeperGroupFactory.java
@@ -23,6 +23,11 @@ import
org.apache.camel.component.zookeepermaster.group.GroupFactory;
import org.apache.camel.component.zookeepermaster.group.NodeState;
import org.apache.curator.framework.CuratorFramework;
+/**
+ * @deprecated Use {@link
org.apache.camel.component.zookeepermaster.group.DefaultManagedGroupFactory}
which supports
+ * virtual threads via ExecutorServiceManager.
+ */
+@Deprecated
public class ZooKeeperGroupFactory implements GroupFactory {
private CuratorFramework curator;
diff --git
a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ZooKeeperMultiGroup.java
b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ZooKeeperMultiGroup.java
index ce3555c4a9d0..4dd18bd43613 100644
---
a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ZooKeeperMultiGroup.java
+++
b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/group/internal/ZooKeeperMultiGroup.java
@@ -27,6 +27,11 @@ import org.apache.curator.framework.CuratorFramework;
public class ZooKeeperMultiGroup<T extends NodeState> extends
ZooKeeperGroup<T> implements MultiGroup<T> {
+ /**
+ * @deprecated Use the constructor that accepts an ExecutorService from
ExecutorServiceManager for virtual threads
+ * support.
+ */
+ @Deprecated
public ZooKeeperMultiGroup(CuratorFramework client, String path, Class<T>
clazz) {
super(client, path, clazz);
}
@@ -35,6 +40,11 @@ public class ZooKeeperMultiGroup<T extends NodeState>
extends ZooKeeperGroup<T>
super(client, path, clazz, executorService);
}
+ /**
+ * @deprecated Use the constructor that accepts an ExecutorService from
ExecutorServiceManager for virtual threads
+ * support.
+ */
+ @Deprecated
public ZooKeeperMultiGroup(CuratorFramework client, String path, Class<T>
clazz, ThreadFactory threadFactory) {
super(client, path, clazz, threadFactory);
}