This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 895af82076db CAMEL-20199: Replace synchronized with ReentrantLock for
virtual thread compatibility (#21703)
895af82076db is described below
commit 895af82076dbd772708cad3f60f03797e69486ca
Author: Guillaume Nodet <[email protected]>
AuthorDate: Thu Mar 5 00:02:09 2026 +0100
CAMEL-20199: Replace synchronized with ReentrantLock for virtual thread
compatibility (#21703)
Virtual threads (JDK 21+) can be pinned to carrier threads when they
enter synchronized blocks, degrading throughput. This replaces
synchronized blocks/methods with ReentrantLock or better concurrent
data structures across core and components:
Core:
- Suppliers.java: synchronized → ReentrantLock for memoization
- DefaultModel.java: 9 synchronized methods → ReentrantLock
- DefaultCamelContext.java: synchronized(model) → model lock
Components:
- Sqs2Endpoint: volatile + ReentrantLock double-checked locking,
removed redundant queueUrlInitialized flag
- MockEndpoint: synchronized → ReentrantLock
- GoogleFirestoreConsumer: synchronized + LinkedList → ConcurrentLinkedQueue
- GooglePubsubConsumer: synchronizedList → CopyOnWriteArrayList
- SubscriptionManager: 12 synchronized constructs → ReentrantLock
- MiloClientConnection: synchronized → ReentrantLock
- CamelNamespace: synchronized → ReentrantLock
---
.../camel/component/aws2/sqs/Sqs2Endpoint.java | 30 +-
.../google/firestore/GoogleFirestoreConsumer.java | 17 +-
.../google/pubsub/GooglePubsubConsumer.java | 13 +-
.../milo/client/MiloClientConnection.java | 26 +-
.../milo/client/internal/SubscriptionManager.java | 123 ++++---
.../milo/server/internal/CamelNamespace.java | 8 +-
.../apache/camel/component/mock/MockEndpoint.java | 7 +-
.../org/apache/camel/impl/DefaultCamelContext.java | 17 +-
.../java/org/apache/camel/impl/DefaultModel.java | 360 ++++++++++++---------
.../org/apache/camel/util/function/Suppliers.java | 14 +-
10 files changed, 376 insertions(+), 239 deletions(-)
diff --git
a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java
b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java
index df1d916507c1..14b3316f0e4e 100644
---
a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java
+++
b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java
@@ -23,6 +23,8 @@ import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.Category;
import org.apache.camel.Consumer;
@@ -63,8 +65,8 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint
implements HeaderFilterS
private static final Logger LOG =
LoggerFactory.getLogger(Sqs2Endpoint.class);
private SqsClient client;
- private String queueUrl;
- private boolean queueUrlInitialized;
+ private final Lock queueUrlLock = new ReentrantLock();
+ private volatile String queueUrl;
private Clock clock = new MonotonicClock();
@UriPath(description = "Queue name or ARN")
@@ -157,7 +159,6 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint
implements HeaderFilterS
if (ObjectHelper.isNotEmpty(configuration.getQueueUrl())) {
queueUrl = configuration.getQueueUrl();
- queueUrlInitialized = true;
} else {
// If both region and Account ID is provided the queue URL can be
// built manually.
@@ -167,14 +168,12 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint
implements HeaderFilterS
&&
ObjectHelper.isNotEmpty(configuration.getQueueOwnerAWSAccountId())) {
queueUrl = getAwsEndpointUri() + "/" +
configuration.getQueueOwnerAWSAccountId() + "/"
+ configuration.getQueueName();
- queueUrlInitialized = true;
} else if
(ObjectHelper.isNotEmpty(configuration.getQueueOwnerAWSAccountId())) {
GetQueueUrlRequest.Builder getQueueUrlRequest =
GetQueueUrlRequest.builder();
getQueueUrlRequest.queueName(configuration.getQueueName());
getQueueUrlRequest.queueOwnerAWSAccountId(configuration.getQueueOwnerAWSAccountId());
GetQueueUrlResponse getQueueUrlResult =
client.getQueueUrl(getQueueUrlRequest.build());
queueUrl = getQueueUrlResult.queueUrl();
- queueUrlInitialized = true;
} else {
initQueueUrl();
}
@@ -205,7 +204,6 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint
implements HeaderFilterS
}
if (ObjectHelper.isNotEmpty(queueUrl)) {
- queueUrlInitialized = true;
break;
}
@@ -385,12 +383,22 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint
implements HeaderFilterS
* If queue does not exist during endpoint initialization, the queueUrl
has to be initialized again. See
* https://issues.apache.org/jira/browse/CAMEL-18968 for more details.
*/
- protected synchronized String getQueueUrl() {
- if (!queueUrlInitialized) {
- LOG.trace("Queue url was not initialized during the start of the
component. Initializing again.");
- initQueueUrl();
+ protected String getQueueUrl() {
+ String url = queueUrl;
+ if (url == null) {
+ queueUrlLock.lock();
+ try {
+ url = queueUrl;
+ if (url == null) {
+ LOG.trace("Queue url was not initialized during the start
of the component. Initializing again.");
+ initQueueUrl();
+ url = queueUrl;
+ }
+ } finally {
+ queueUrlLock.unlock();
+ }
}
- return queueUrl;
+ return url;
}
public int getMaxMessagesPerPoll() {
diff --git
a/components/camel-google/camel-google-firestore/src/main/java/org/apache/camel/component/google/firestore/GoogleFirestoreConsumer.java
b/components/camel-google/camel-google-firestore/src/main/java/org/apache/camel/component/google/firestore/GoogleFirestoreConsumer.java
index 88eec23a70f1..d39188edb737 100644
---
a/components/camel-google/camel-google-firestore/src/main/java/org/apache/camel/component/google/firestore/GoogleFirestoreConsumer.java
+++
b/components/camel-google/camel-google-firestore/src/main/java/org/apache/camel/component/google/firestore/GoogleFirestoreConsumer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.google.firestore;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -48,12 +49,11 @@ public class GoogleFirestoreConsumer extends
ScheduledBatchPollingConsumer {
private final GoogleFirestoreEndpoint endpoint;
private ListenerRegistration listenerRegistration;
- private volatile Queue<Exchange> pendingExchanges;
+ private final Queue<Exchange> pendingExchanges = new
ConcurrentLinkedQueue<>();
public GoogleFirestoreConsumer(GoogleFirestoreEndpoint endpoint, Processor
processor) {
super(endpoint, processor);
this.endpoint = endpoint;
- this.pendingExchanges = new LinkedList<>();
}
@Override
@@ -93,9 +93,7 @@ public class GoogleFirestoreConsumer extends
ScheduledBatchPollingConsumer {
for (DocumentChange dc : snapshots.getDocumentChanges()) {
try {
Exchange exchange =
createExchangeFromDocument(dc.getDocument(), dc.getType());
- synchronized (pendingExchanges) {
- pendingExchanges.add(exchange);
- }
+ pendingExchanges.add(exchange);
} catch (Exception ex) {
LOG.error("Error creating exchange from document
change", ex);
}
@@ -119,10 +117,11 @@ public class GoogleFirestoreConsumer extends
ScheduledBatchPollingConsumer {
Queue<Exchange> exchanges;
if (endpoint.getConfiguration().isRealtimeUpdates()) {
- // Get pending exchanges from realtime listener
- synchronized (pendingExchanges) {
- exchanges = new LinkedList<>(pendingExchanges);
- pendingExchanges.clear();
+ // Drain pending exchanges from realtime listener (lock-free)
+ exchanges = new LinkedList<>();
+ Exchange e;
+ while ((e = pendingExchanges.poll()) != null) {
+ exchanges.add(e);
}
} else {
// Poll the collection
diff --git
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
index ef40e7605580..ac98b6852803 100644
---
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
+++
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
@@ -18,12 +18,11 @@ package org.apache.camel.component.google.pubsub;
import java.io.IOException;
import java.time.Duration;
-import java.util.Collections;
-import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -69,7 +68,7 @@ public class GooglePubsubConsumer extends DefaultConsumer {
super(endpoint, processor);
this.endpoint = endpoint;
this.processor = processor;
- this.subscribers = Collections.synchronizedList(new LinkedList<>());
+ this.subscribers = new CopyOnWriteArrayList<>();
this.pendingSynchronousPullResponses = ConcurrentHashMap.newKeySet();
String loggerId = endpoint.getLoggerId();
@@ -96,11 +95,9 @@ public class GooglePubsubConsumer extends DefaultConsumer {
protected void doStop() throws Exception {
localLog.info("Stopping Google PubSub consumer for {}/{}",
endpoint.getProjectId(), endpoint.getDestinationName());
- synchronized (subscribers) {
- if (!subscribers.isEmpty()) {
- localLog.info("Stopping subscribers for {}/{}",
endpoint.getProjectId(), endpoint.getDestinationName());
- subscribers.forEach(AbstractApiService::stopAsync);
- }
+ if (!subscribers.isEmpty()) {
+ localLog.info("Stopping subscribers for {}/{}",
endpoint.getProjectId(), endpoint.getDestinationName());
+ subscribers.forEach(AbstractApiService::stopAsync);
}
safeCancelSynchronousPullResponses();
diff --git
a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConnection.java
b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConnection.java
index bf21bace3584..e3766528336e 100644
---
a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConnection.java
+++
b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConnection.java
@@ -19,6 +19,8 @@ package org.apache.camel.component.milo.client;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.apache.camel.RuntimeCamelException;
@@ -37,6 +39,7 @@ import static java.util.Objects.requireNonNull;
public class MiloClientConnection implements AutoCloseable {
+ private final Lock lock = new ReentrantLock();
private final MiloClientConfiguration configuration;
private SubscriptionManager manager;
private volatile boolean initialized;
@@ -67,17 +70,22 @@ public class MiloClientConnection implements AutoCloseable {
}
}
- protected synchronized void checkInit() {
- if (this.initialized) {
- return;
- }
-
+ protected void checkInit() {
+ lock.lock();
try {
- init();
- } catch (final Exception e) {
- throw new RuntimeCamelException(e);
+ if (this.initialized) {
+ return;
+ }
+
+ try {
+ init();
+ } catch (final Exception e) {
+ throw new RuntimeCamelException(e);
+ }
+ this.initialized = true;
+ } finally {
+ lock.unlock();
}
- this.initialized = true;
}
@FunctionalInterface
diff --git
a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/internal/SubscriptionManager.java
b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/internal/SubscriptionManager.java
index f62e880e8d87..bce5c27401ce 100644
---
a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/internal/SubscriptionManager.java
+++
b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/internal/SubscriptionManager.java
@@ -33,6 +33,8 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.regex.Pattern;
@@ -582,6 +584,7 @@ public class SubscriptionManager {
private final MiloClientConfiguration configuration;
private final ScheduledExecutorService executor;
private final long reconnectTimeout;
+ private final Lock lock = new ReentrantLock();
private Connected connected;
private boolean disposed;
@@ -598,31 +601,39 @@ public class SubscriptionManager {
connect();
}
- private synchronized void handleConnectionFailure(final Throwable e) {
- if (this.connected != null) {
- this.connected.dispose();
- this.connected = null;
- }
+ private void handleConnectionFailure(final Throwable e) {
+ lock.lock();
+ try {
+ if (this.connected != null) {
+ this.connected.dispose();
+ this.connected = null;
+ }
- // log
+ // log
- LOG.info("Connection failed", e);
+ LOG.info("Connection failed", e);
- // always trigger re-connect
+ // always trigger re-connect
- triggerReconnect(true);
+ triggerReconnect(true);
+ } finally {
+ lock.unlock();
+ }
}
private void connect() {
LOG.info("Starting connect");
- synchronized (this) {
+ lock.lock();
+ try {
this.reconnectJob = null;
if (this.disposed) {
// we woke up disposed
return;
}
+ } finally {
+ lock.unlock();
}
performAndEvalConnect();
@@ -632,7 +643,8 @@ public class SubscriptionManager {
try {
final Connected connected = performConnect();
LOG.debug("Connect call done");
- synchronized (this) {
+ lock.lock();
+ try {
if (this.disposed) {
// we got disposed during connect
return;
@@ -656,6 +668,8 @@ public class SubscriptionManager {
connected.dispose();
throw e;
}
+ } finally {
+ lock.unlock();
}
} catch (final Exception e) {
LOG.info("Failed to connect", e);
@@ -765,12 +779,15 @@ public class SubscriptionManager {
public void dispose() {
Connected connected;
- synchronized (this) {
+ lock.lock();
+ try {
if (this.disposed) {
return;
}
this.disposed = true;
connected = this.connected;
+ } finally {
+ lock.unlock();
}
if (connected != null) {
@@ -779,18 +796,23 @@ public class SubscriptionManager {
}
}
- private synchronized void triggerReconnect(final boolean immediate) {
- LOG.info("Trigger re-connect (immediate: {})", immediate);
+ private void triggerReconnect(final boolean immediate) {
+ lock.lock();
+ try {
+ LOG.info("Trigger re-connect (immediate: {})", immediate);
- if (this.reconnectJob != null) {
- LOG.info("Re-connect already scheduled");
- return;
- }
+ if (this.reconnectJob != null) {
+ LOG.info("Re-connect already scheduled");
+ return;
+ }
- if (immediate) {
- this.reconnectJob = this.executor.submit(this::connect);
- } else {
- this.reconnectJob = this.executor.schedule(this::connect,
this.reconnectTimeout, TimeUnit.MILLISECONDS);
+ if (immediate) {
+ this.reconnectJob = this.executor.submit(this::connect);
+ } else {
+ this.reconnectJob = this.executor.schedule(this::connect,
this.reconnectTimeout, TimeUnit.MILLISECONDS);
+ }
+ } finally {
+ lock.unlock();
}
}
@@ -873,13 +895,18 @@ public class SubscriptionManager {
.toString();
}
- protected synchronized void whenConnected(final Worker<Connected> worker) {
- if (this.connected != null) {
- try {
- worker.work(this.connected);
- } catch (final Exception e) {
- handleConnectionFailure(e);
+ protected void whenConnected(final Worker<Connected> worker) {
+ lock.lock();
+ try {
+ if (this.connected != null) {
+ try {
+ worker.work(this.connected);
+ } catch (final Exception e) {
+ handleConnectionFailure(e);
+ }
}
+ } finally {
+ lock.unlock();
}
}
@@ -896,28 +923,37 @@ public class SubscriptionManager {
final UInteger clientHandle =
Unsigned.uint(this.clientHandleCounter.incrementAndGet());
final Subscription subscription = new Subscription(nodeId,
samplingInterval, valueConsumer, monitorFilterConfiguration);
- synchronized (this) {
+ lock.lock();
+ try {
this.subscriptions.put(clientHandle, subscription);
whenConnected(connected -> {
connected.activate(clientHandle, subscription);
});
+ } finally {
+ lock.unlock();
}
return clientHandle;
}
- public synchronized void unregisterItem(final UInteger clientHandle) {
- if (this.subscriptions.remove(clientHandle) != null) {
- whenConnected(connected -> {
- connected.deactivate(clientHandle);
- });
+ public void unregisterItem(final UInteger clientHandle) {
+ lock.lock();
+ try {
+ if (this.subscriptions.remove(clientHandle) != null) {
+ whenConnected(connected -> {
+ connected.deactivate(clientHandle);
+ });
+ }
+ } finally {
+ lock.unlock();
}
}
public CompletableFuture<CallMethodResult> call(
final ExpandedNodeId nodeId, final ExpandedNodeId methodId, final
Variant[] inputArguments) {
- synchronized (this) {
+ lock.lock();
+ try {
if (this.connected == null) {
return newNotConnectedResult();
}
@@ -930,11 +966,14 @@ public class SubscriptionManager {
}
return null;
}, this.executor);
+ } finally {
+ lock.unlock();
}
}
public CompletableFuture<?> write(final ExpandedNodeId nodeId, final
DataValue value) {
- synchronized (this) {
+ lock.lock();
+ try {
if (this.connected == null) {
return newNotConnectedResult();
}
@@ -947,11 +986,14 @@ public class SubscriptionManager {
}
return status;
}, this.executor);
+ } finally {
+ lock.unlock();
}
}
public CompletableFuture<?> readValues(final List<ExpandedNodeId> nodeIds)
{
- synchronized (this) {
+ lock.lock();
+ try {
if (this.connected == null) {
return newNotConnectedResult();
}
@@ -964,13 +1006,16 @@ public class SubscriptionManager {
}
return nodes;
}, this.executor);
+ } finally {
+ lock.unlock();
}
}
public CompletableFuture<Map<ExpandedNodeId, BrowseResult>> browse(
List<ExpandedNodeId> expandedNodeIds, BrowseDirection direction,
int nodeClasses, int maxDepth, String filter,
boolean includeSubTypes, int maxNodesPerRequest) {
- synchronized (this) {
+ lock.lock();
+ try {
if (this.connected == null) {
return newNotConnectedResult();
}
@@ -985,6 +1030,8 @@ public class SubscriptionManager {
}
return browseResult;
}, this.executor);
+ } finally {
+ lock.unlock();
}
}
}
diff --git
a/components/camel-milo/src/main/java/org/apache/camel/component/milo/server/internal/CamelNamespace.java
b/components/camel-milo/src/main/java/org/apache/camel/component/milo/server/internal/CamelNamespace.java
index bcb5d88241c5..1a076c375074 100644
---
a/components/camel-milo/src/main/java/org/apache/camel/component/milo/server/internal/CamelNamespace.java
+++
b/components/camel-milo/src/main/java/org/apache/camel/component/milo/server/internal/CamelNamespace.java
@@ -19,6 +19,8 @@ package org.apache.camel.component.milo.server.internal;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.milo.opcua.sdk.core.Reference;
import org.eclipse.milo.opcua.sdk.server.ManagedNamespaceWithLifecycle;
@@ -43,6 +45,7 @@ public class CamelNamespace extends
ManagedNamespaceWithLifecycle {
private UaObjectNode itemsObject;
private UaFolderNode folder;
+ private final Lock lock = new ReentrantLock();
private final Map<String, CamelServerItem> itemMap = new HashMap<>();
private final BinaryDataTypeDictionaryManager dictionaryManager;
@@ -124,9 +127,12 @@ public class CamelNamespace extends
ManagedNamespaceWithLifecycle {
}
public CamelServerItem getOrAddItem(final String itemId) {
- synchronized (this) {
+ lock.lock();
+ try {
return this.itemMap.computeIfAbsent(itemId,
k -> new CamelServerItem(itemId, getNodeContext(),
getNamespaceIndex(), this.itemsObject));
+ } finally {
+ lock.unlock();
}
}
}
diff --git
a/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
b/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
index 07ad9b4c4c43..7d07929a08f5 100644
---
a/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
+++
b/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
@@ -31,6 +31,8 @@ import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
@@ -118,6 +120,7 @@ public class MockEndpoint extends DefaultEndpoint
implements BrowsableEndpoint,
private volatile Map<String, Object> expectedPropertyValues;
private volatile Map<String, Object> expectedVariableValues;
+ private final Lock lock = new ReentrantLock();
private final AtomicInteger counter = new AtomicInteger();
@UriPath(description = "Name of mock endpoint")
@@ -1746,7 +1749,8 @@ public class MockEndpoint extends DefaultEndpoint
implements BrowsableEndpoint,
return (MockComponent) super.getComponent();
}
- protected synchronized void onExchange(Exchange exchange) {
+ protected void onExchange(Exchange exchange) {
+ lock.lock();
try {
if (log) {
String line =
getComponent().getExchangeFormatter().format(exchange);
@@ -1774,6 +1778,7 @@ public class MockEndpoint extends DefaultEndpoint
implements BrowsableEndpoint,
if (latch != null) {
latch.countDown();
}
+ lock.unlock();
}
}
diff --git
a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 335a0988bd84..6e58a61a36df 100644
---
a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++
b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -42,7 +42,6 @@ import org.apache.camel.impl.engine.SimpleCamelContext;
import org.apache.camel.model.BeanFactoryDefinition;
import org.apache.camel.model.DataFormatDefinition;
import org.apache.camel.model.FaultToleranceConfigurationDefinition;
-import org.apache.camel.model.Model;
import org.apache.camel.model.ModelCamelContext;
import org.apache.camel.model.ModelLifecycleStrategy;
import org.apache.camel.model.ProcessorDefinition;
@@ -106,7 +105,7 @@ public class DefaultCamelContext extends SimpleCamelContext
implements ModelCame
private static final Logger LOG =
LoggerFactory.getLogger(DefaultCamelContext.class);
private static final UuidGenerator UUID = new SimpleUuidGenerator();
- private final Model model = new DefaultModel(this);
+ private final DefaultModel model = new DefaultModel(this);
/**
* Creates the {@link ModelCamelContext} using {@link
org.apache.camel.support.DefaultRegistry} as registry.
@@ -839,9 +838,10 @@ public class DefaultCamelContext extends
SimpleCamelContext implements ModelCame
@Override
protected boolean removeRoute(String routeId, LoggingLevel loggingLevel)
throws Exception {
- // synchronize on model first to avoid deadlock with concurrent
'addRoutes'
+ // lock on model first to avoid deadlock with concurrent 'addRoutes'
// calls:
- synchronized (model) {
+ model.getLock().lock();
+ try {
getLock().lock();
try {
boolean removed = super.removeRoute(routeId, loggingLevel);
@@ -856,15 +856,20 @@ public class DefaultCamelContext extends
SimpleCamelContext implements ModelCame
} finally {
getLock().unlock();
}
+ } finally {
+ model.getLock().unlock();
}
}
@Override
public boolean removeRoute(String routeId) throws Exception {
- // synchronize on model first to avoid deadlock with concurrent
'addRoutes'
+ // lock on model first to avoid deadlock with concurrent 'addRoutes'
// calls:
- synchronized (model) {
+ model.getLock().lock();
+ try {
return super.removeRoute(routeId);
+ } finally {
+ model.getLock().unlock();
}
}
diff --git
a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java
b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java
index 3489564c8ee4..8419f9f61992 100644
---
a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java
+++
b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java
@@ -24,6 +24,8 @@ import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.apache.camel.CamelContext;
@@ -68,6 +70,7 @@ import org.apache.camel.util.StringHelper;
public class DefaultModel implements Model {
private final CamelContext camelContext;
+ private final Lock lock = new ReentrantLock();
private ModelReifierFactory modelReifierFactory = new
DefaultModelReifierFactory();
private final List<ModelLifecycleStrategy> modelLifecycleStrategies = new
ArrayList<>();
@@ -94,6 +97,10 @@ public class DefaultModel implements Model {
return camelContext;
}
+ public Lock getLock() {
+ return lock;
+ }
+
@Override
public void addModelLifecycleStrategy(ModelLifecycleStrategy
modelLifecycleStrategy) {
// avoid adding double which can happen with spring xml on spring boot
@@ -154,15 +161,20 @@ public class DefaultModel implements Model {
}
@Override
- public synchronized RouteConfigurationDefinition
getRouteConfigurationDefinition(String id) {
- for (RouteConfigurationDefinition def : routesConfigurations) {
- if
(def.idOrCreate(camelContext.getCamelContextExtension().getContextPlugin(NodeIdFactory.class))
- .equals(id)) {
- return def;
+ public RouteConfigurationDefinition getRouteConfigurationDefinition(String
id) {
+ lock.lock();
+ try {
+ for (RouteConfigurationDefinition def : routesConfigurations) {
+ if
(def.idOrCreate(camelContext.getCamelContextExtension().getContextPlugin(NodeIdFactory.class))
+ .equals(id)) {
+ return def;
+ }
}
+ // you can have a global route configuration that has no ID
assigned
+ return routesConfigurations.stream().filter(c -> c.getId() ==
null).findFirst().orElse(null);
+ } finally {
+ lock.unlock();
}
- // you can have a global route configuration that has no ID assigned
- return routesConfigurations.stream().filter(c -> c.getId() ==
null).findFirst().orElse(null);
}
@Override
@@ -173,132 +185,137 @@ public class DefaultModel implements Model {
}
@Override
- public synchronized void addRouteDefinitions(Collection<RouteDefinition>
routeDefinitions) throws Exception {
- if (routeDefinitions == null || routeDefinitions.isEmpty()) {
- return;
- }
+ public void addRouteDefinitions(Collection<RouteDefinition>
routeDefinitions) throws Exception {
+ lock.lock();
+ try {
+ if (routeDefinitions == null || routeDefinitions.isEmpty()) {
+ return;
+ }
- List<RouteDefinition> list;
- if (routeFilter == null) {
- list = new ArrayList<>(routeDefinitions);
- } else {
- list = new ArrayList<>();
- for (RouteDefinition r : routeDefinitions) {
- if (routeFilter.apply(r)) {
- list.add(r);
+ List<RouteDefinition> list;
+ if (routeFilter == null) {
+ list = new ArrayList<>(routeDefinitions);
+ } else {
+ list = new ArrayList<>();
+ for (RouteDefinition r : routeDefinitions) {
+ if (routeFilter.apply(r)) {
+ list.add(r);
+ }
}
}
- }
- removeRouteDefinitions(list);
-
- // special if rest-dsl is inlining routes
- if (camelContext.getRestConfiguration().isInlineRoutes()) {
- List<RouteDefinition> allRoutes = new ArrayList<>();
- allRoutes.addAll(list);
- allRoutes.addAll(this.routeDefinitions);
-
- List<RouteDefinition> toBeRemoved = new ArrayList<>();
- Map<String, RouteDefinition> directs = new HashMap<>();
- for (RouteDefinition r : allRoutes) {
- // does the route start with direct, which is candidate for
rest-dsl
- FromDefinition from = r.getInput();
- if (from != null) {
- String uri = from.getEndpointUri();
- if (uri != null && uri.startsWith("direct:")) {
- directs.put(uri, r);
+ removeRouteDefinitions(list);
+
+ // special if rest-dsl is inlining routes
+ if (camelContext.getRestConfiguration().isInlineRoutes()) {
+ List<RouteDefinition> allRoutes = new ArrayList<>();
+ allRoutes.addAll(list);
+ allRoutes.addAll(this.routeDefinitions);
+
+ List<RouteDefinition> toBeRemoved = new ArrayList<>();
+ Map<String, RouteDefinition> directs = new HashMap<>();
+ for (RouteDefinition r : allRoutes) {
+ // does the route start with direct, which is candidate
for rest-dsl
+ FromDefinition from = r.getInput();
+ if (from != null) {
+ String uri = from.getEndpointUri();
+ if (uri != null && uri.startsWith("direct:")) {
+ directs.put(uri, r);
+ }
}
}
- }
- for (RouteDefinition r : allRoutes) {
- // loop all rest routes
- FromDefinition from = r.getInput();
- if (from != null && !r.isInlined()) {
- // only attempt to inline if not already inlined
- String uri = from.getEndpointUri();
- if (uri != null && uri.startsWith("rest:")) {
- // find first EIP in the outputs (skip abstract which
are onException/intercept
- // etc)
- ToDefinition to = null;
- for (ProcessorDefinition<?> def : r.getOutputs()) {
- if (def.isAbstract()) {
- continue;
- }
- if (def instanceof ToDefinition toDefinition) {
- to = toDefinition;
+ for (RouteDefinition r : allRoutes) {
+ // loop all rest routes
+ FromDefinition from = r.getInput();
+ if (from != null && !r.isInlined()) {
+ // only attempt to inline if not already inlined
+ String uri = from.getEndpointUri();
+ if (uri != null && uri.startsWith("rest:")) {
+ // find first EIP in the outputs (skip abstract
which are onException/intercept
+ // etc)
+ ToDefinition to = null;
+ for (ProcessorDefinition<?> def : r.getOutputs()) {
+ if (def.isAbstract()) {
+ continue;
+ }
+ if (def instanceof ToDefinition toDefinition) {
+ to = toDefinition;
+ }
+ break;
}
- break;
- }
- if (to != null) {
- String toUri = to.getEndpointUri();
- RouteDefinition toBeInlined = directs.get(toUri);
- if (toBeInlined != null) {
- toBeRemoved.add(toBeInlined);
- // inline the source loc:line as starting from
this direct input
- FromDefinition inlinedFrom =
toBeInlined.getInput();
- from.setLocation(inlinedFrom.getLocation());
-
from.setLineNumber(inlinedFrom.getLineNumber());
- // inline by replacing the outputs (preserve
all abstracts such as interceptors)
- List<ProcessorDefinition<?>> toBeRemovedOut =
new ArrayList<>();
- for (ProcessorDefinition<?> out :
r.getOutputs()) {
- // should be removed if to be added via
inlined
- boolean remove =
toBeInlined.getOutputs().stream().anyMatch(o -> o == out);
- if (!remove) {
- remove = !out.isAbstract(); // remove
all non abstract
+ if (to != null) {
+ String toUri = to.getEndpointUri();
+ RouteDefinition toBeInlined =
directs.get(toUri);
+ if (toBeInlined != null) {
+ toBeRemoved.add(toBeInlined);
+ // inline the source loc:line as starting
from this direct input
+ FromDefinition inlinedFrom =
toBeInlined.getInput();
+
from.setLocation(inlinedFrom.getLocation());
+
from.setLineNumber(inlinedFrom.getLineNumber());
+ // inline by replacing the outputs
(preserve all abstracts such as interceptors)
+ List<ProcessorDefinition<?>>
toBeRemovedOut = new ArrayList<>();
+ for (ProcessorDefinition<?> out :
r.getOutputs()) {
+ // should be removed if to be added
via inlined
+ boolean remove =
toBeInlined.getOutputs().stream().anyMatch(o -> o == out);
+ if (!remove) {
+ remove = !out.isAbstract(); //
remove all non abstract
+ }
+ if (remove) {
+ toBeRemovedOut.add(out);
+ }
}
- if (remove) {
- toBeRemovedOut.add(out);
+ r.getOutputs().removeAll(toBeRemovedOut);
+
r.getOutputs().addAll(toBeInlined.getOutputs());
+ // inlined outputs should have re-assigned
parent to this route
+ r.getOutputs().forEach(o ->
o.setParent(r));
+ // and copy over various configurations
+ if (toBeInlined.getRouteId() != null) {
+ r.setId(toBeInlined.getRouteId());
}
+
r.setNodePrefixId(toBeInlined.getNodePrefixId());
+ r.setGroup(toBeInlined.getGroup());
+
r.setAutoStartup(toBeInlined.getAutoStartup());
+ r.setDelayer(toBeInlined.getDelayer());
+ r.setInputType(toBeInlined.getInputType());
+
r.setOutputType(toBeInlined.getOutputType());
+ r.setLogMask(toBeInlined.getLogMask());
+
r.setMessageHistory(toBeInlined.getMessageHistory());
+
r.setStreamCache(toBeInlined.getStreamCache());
+ r.setTrace(toBeInlined.getTrace());
+
r.setStartupOrder(toBeInlined.getStartupOrder());
+
r.setRoutePolicyRef(toBeInlined.getRoutePolicyRef());
+
r.setRouteConfigurationId(toBeInlined.getRouteConfigurationId());
+
r.setRoutePolicies(toBeInlined.getRoutePolicies());
+
r.setShutdownRoute(toBeInlined.getShutdownRoute());
+
r.setShutdownRunningTask(toBeInlined.getShutdownRunningTask());
+
r.setErrorHandlerRef(toBeInlined.getErrorHandlerRef());
+
r.setPrecondition(toBeInlined.getPrecondition());
+ if
(toBeInlined.isErrorHandlerFactorySet()) {
+
r.setErrorHandler(toBeInlined.getErrorHandler());
+ }
+ r.markInlined();
}
- r.getOutputs().removeAll(toBeRemovedOut);
-
r.getOutputs().addAll(toBeInlined.getOutputs());
- // inlined outputs should have re-assigned
parent to this route
- r.getOutputs().forEach(o -> o.setParent(r));
- // and copy over various configurations
- if (toBeInlined.getRouteId() != null) {
- r.setId(toBeInlined.getRouteId());
- }
-
r.setNodePrefixId(toBeInlined.getNodePrefixId());
- r.setGroup(toBeInlined.getGroup());
- r.setAutoStartup(toBeInlined.getAutoStartup());
- r.setDelayer(toBeInlined.getDelayer());
- r.setInputType(toBeInlined.getInputType());
- r.setOutputType(toBeInlined.getOutputType());
- r.setLogMask(toBeInlined.getLogMask());
-
r.setMessageHistory(toBeInlined.getMessageHistory());
- r.setStreamCache(toBeInlined.getStreamCache());
- r.setTrace(toBeInlined.getTrace());
-
r.setStartupOrder(toBeInlined.getStartupOrder());
-
r.setRoutePolicyRef(toBeInlined.getRoutePolicyRef());
-
r.setRouteConfigurationId(toBeInlined.getRouteConfigurationId());
-
r.setRoutePolicies(toBeInlined.getRoutePolicies());
-
r.setShutdownRoute(toBeInlined.getShutdownRoute());
-
r.setShutdownRunningTask(toBeInlined.getShutdownRunningTask());
-
r.setErrorHandlerRef(toBeInlined.getErrorHandlerRef());
-
r.setPrecondition(toBeInlined.getPrecondition());
- if (toBeInlined.isErrorHandlerFactorySet()) {
-
r.setErrorHandler(toBeInlined.getErrorHandler());
- }
- r.markInlined();
}
}
}
}
+ // remove all the routes that was inlined
+ list.removeAll(toBeRemoved);
+ this.routeDefinitions.removeAll(toBeRemoved);
}
- // remove all the routes that was inlined
- list.removeAll(toBeRemoved);
- this.routeDefinitions.removeAll(toBeRemoved);
- }
- for (RouteDefinition r : list) {
- for (ModelLifecycleStrategy s : modelLifecycleStrategies) {
- s.onAddRouteDefinition(r);
+ for (RouteDefinition r : list) {
+ for (ModelLifecycleStrategy s : modelLifecycleStrategies) {
+ s.onAddRouteDefinition(r);
+ }
+ this.routeDefinitions.add(r);
}
- this.routeDefinitions.add(r);
- }
- if (shouldStartRoutes()) {
- ((ModelCamelContext)
getCamelContext()).startRouteDefinitions(list);
+ if (shouldStartRoutes()) {
+ ((ModelCamelContext)
getCamelContext()).startRouteDefinitions(list);
+ }
+ } finally {
+ lock.unlock();
}
}
@@ -308,51 +325,76 @@ public class DefaultModel implements Model {
}
@Override
- public synchronized void
removeRouteDefinitions(Collection<RouteDefinition> routeDefinitions) throws
Exception {
- for (RouteDefinition routeDefinition : routeDefinitions) {
- removeRouteDefinition(routeDefinition);
+ public void removeRouteDefinitions(Collection<RouteDefinition>
routeDefinitions) throws Exception {
+ lock.lock();
+ try {
+ for (RouteDefinition routeDefinition : routeDefinitions) {
+ removeRouteDefinition(routeDefinition);
+ }
+ } finally {
+ lock.unlock();
}
}
@Override
- public synchronized void removeRouteDefinition(RouteDefinition
routeDefinition) throws Exception {
- RouteDefinition toBeRemoved = routeDefinition;
- String id = routeDefinition.getId();
- if (id != null) {
- // remove existing route
- camelContext.getRouteController().stopRoute(id);
- camelContext.removeRoute(id);
- toBeRemoved = getRouteDefinition(id);
- }
- for (ModelLifecycleStrategy s : modelLifecycleStrategies) {
- s.onRemoveRouteDefinition(toBeRemoved);
+ public void removeRouteDefinition(RouteDefinition routeDefinition) throws
Exception {
+ lock.lock();
+ try {
+ RouteDefinition toBeRemoved = routeDefinition;
+ String id = routeDefinition.getId();
+ if (id != null) {
+ // remove existing route
+ camelContext.getRouteController().stopRoute(id);
+ camelContext.removeRoute(id);
+ toBeRemoved = getRouteDefinition(id);
+ }
+ for (ModelLifecycleStrategy s : modelLifecycleStrategies) {
+ s.onRemoveRouteDefinition(toBeRemoved);
+ }
+ this.routeDefinitions.remove(toBeRemoved);
+ } finally {
+ lock.unlock();
}
- this.routeDefinitions.remove(toBeRemoved);
}
@Override
- public synchronized void removeRouteTemplateDefinitions(String pattern)
throws Exception {
- for (RouteTemplateDefinition def : new
ArrayList<>(routeTemplateDefinitions)) {
- if (PatternHelper.matchPattern(def.getId(), pattern)) {
- removeRouteTemplateDefinition(def);
+ public void removeRouteTemplateDefinitions(String pattern) throws
Exception {
+ lock.lock();
+ try {
+ for (RouteTemplateDefinition def : new
ArrayList<>(routeTemplateDefinitions)) {
+ if (PatternHelper.matchPattern(def.getId(), pattern)) {
+ removeRouteTemplateDefinition(def);
+ }
}
+ } finally {
+ lock.unlock();
}
}
@Override
- public synchronized List<RouteDefinition> getRouteDefinitions() {
- return routeDefinitions;
+ public List<RouteDefinition> getRouteDefinitions() {
+ lock.lock();
+ try {
+ return routeDefinitions;
+ } finally {
+ lock.unlock();
+ }
}
@Override
- public synchronized RouteDefinition getRouteDefinition(String id) {
- for (RouteDefinition route : routeDefinitions) {
- if
(route.idOrCreate(camelContext.getCamelContextExtension().getContextPlugin(NodeIdFactory.class))
- .equals(id)) {
- return route;
+ public RouteDefinition getRouteDefinition(String id) {
+ lock.lock();
+ try {
+ for (RouteDefinition route : routeDefinitions) {
+ if
(route.idOrCreate(camelContext.getCamelContextExtension().getContextPlugin(NodeIdFactory.class))
+ .equals(id)) {
+ return route;
+ }
}
+ return null;
+ } finally {
+ lock.unlock();
}
- return null;
}
@Override
@@ -663,24 +705,34 @@ public class DefaultModel implements Model {
}
@Override
- public synchronized List<RestDefinition> getRestDefinitions() {
- return restDefinitions;
+ public List<RestDefinition> getRestDefinitions() {
+ lock.lock();
+ try {
+ return restDefinitions;
+ } finally {
+ lock.unlock();
+ }
}
@Override
- public synchronized void addRestDefinitions(Collection<RestDefinition>
restDefinitions, boolean addToRoutes)
+ public void addRestDefinitions(Collection<RestDefinition> restDefinitions,
boolean addToRoutes)
throws Exception {
- if (restDefinitions == null || restDefinitions.isEmpty()) {
- return;
- }
+ lock.lock();
+ try {
+ if (restDefinitions == null || restDefinitions.isEmpty()) {
+ return;
+ }
- this.restDefinitions.addAll(restDefinitions);
- if (addToRoutes) {
- // rests are also routes so need to add them there too
- for (final RestDefinition restDefinition : restDefinitions) {
- List<RouteDefinition> routeDefinitions =
restDefinition.asRouteDefinition(camelContext);
- addRouteDefinitions(routeDefinitions);
+ this.restDefinitions.addAll(restDefinitions);
+ if (addToRoutes) {
+ // rests are also routes so need to add them there too
+ for (final RestDefinition restDefinition : restDefinitions) {
+ List<RouteDefinition> routeDefinitions =
restDefinition.asRouteDefinition(camelContext);
+ addRouteDefinitions(routeDefinitions);
+ }
}
+ } finally {
+ lock.unlock();
}
}
diff --git
a/core/camel-util/src/main/java/org/apache/camel/util/function/Suppliers.java
b/core/camel-util/src/main/java/org/apache/camel/util/function/Suppliers.java
index 7d1d4b45409c..25427619f52f 100644
---
a/core/camel-util/src/main/java/org/apache/camel/util/function/Suppliers.java
+++
b/core/camel-util/src/main/java/org/apache/camel/util/function/Suppliers.java
@@ -19,6 +19,8 @@ package org.apache.camel.util.function;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
@@ -37,17 +39,21 @@ public final class Suppliers {
*/
public static <T> Supplier<T> memorize(Supplier<T> supplier) {
final AtomicReference<T> valueHolder = new AtomicReference<>();
+ final Lock lock = new ReentrantLock();
return new Supplier<>() {
@Override
public T get() {
T supplied = valueHolder.get();
if (supplied == null) {
- synchronized (valueHolder) {
+ lock.lock();
+ try {
supplied = valueHolder.get();
if (supplied == null) {
supplied = Objects.requireNonNull(supplier.get(),
"Supplier should not return null");
valueHolder.lazySet(supplied);
}
+ } finally {
+ lock.unlock();
}
}
return supplied;
@@ -66,12 +72,14 @@ public final class Suppliers {
*/
public static <T> Supplier<T> memorize(ThrowingSupplier<T, ? extends
Exception> supplier, Consumer<Exception> consumer) {
final AtomicReference<T> valueHolder = new AtomicReference<>();
+ final Lock lock = new ReentrantLock();
return new Supplier<>() {
@Override
public T get() {
T supplied = valueHolder.get();
if (supplied == null) {
- synchronized (valueHolder) {
+ lock.lock();
+ try {
supplied = valueHolder.get();
if (supplied == null) {
try {
@@ -81,6 +89,8 @@ public final class Suppliers {
consumer.accept(e);
}
}
+ } finally {
+ lock.unlock();
}
}
return supplied;