This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 07bcafc7839 CAMEL-20199: Remove synchronized block from components A
to C
07bcafc7839 is described below
commit 07bcafc783935293af4938ed8f5611a17c9f4f5c
Author: Nicolas Filotto <[email protected]>
AuthorDate: Thu Aug 1 14:53:53 2024 +0200
CAMEL-20199: Remove synchronized block from components A to C
---
.../as2/api/AS2AsyncMDNServerConnection.java | 14 ++-
.../component/as2/api/AS2ServerConnection.java | 14 ++-
.../as2/internal/AS2ConnectionHelper.java | 100 +++++++++++---------
.../atmosphere/websocket/WebsocketComponent.java | 12 +--
.../component/aws2/kinesis/Kinesis2Consumer.java | 16 ++--
.../aws2/s3/stream/AWS2S3StreamUploadProducer.java | 24 ++++-
.../component/bean/AbstractBeanProcessor.java | 10 +-
.../camel/component/bean/MethodInfoCache.java | 18 +---
.../component/braintree/BraintreeComponent.java | 20 ++--
.../camel/component/cometd/CometdComponent.java | 36 ++++---
.../consul/cluster/ConsulClusterView.java | 20 ++--
.../component/couchbase/CouchbaseConsumer.java | 105 +++++++++++----------
...MarshalHeaderWithCustomMarshallFactoryTest.java | 8 +-
.../camel/component/cxf/jaxrs/CxfRsProducer.java | 28 +++---
14 files changed, 234 insertions(+), 191 deletions(-)
diff --git
a/components/camel-as2/camel-as2-api/src/main/java/org/apache/camel/component/as2/api/AS2AsyncMDNServerConnection.java
b/components/camel-as2/camel-as2-api/src/main/java/org/apache/camel/component/as2/api/AS2AsyncMDNServerConnection.java
index 0f9660c3d95..29bc7ab8547 100644
---
a/components/camel-as2/camel-as2-api/src/main/java/org/apache/camel/component/as2/api/AS2AsyncMDNServerConnection.java
+++
b/components/camel-as2/camel-as2-api/src/main/java/org/apache/camel/component/as2/api/AS2AsyncMDNServerConnection.java
@@ -21,6 +21,8 @@ import java.io.InterruptedIOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLServerSocketFactory;
@@ -54,7 +56,7 @@ public class AS2AsyncMDNServerConnection {
private static final String REQUEST_HANDLER_THREAD_NAME_PREFIX =
"AS2AsyncMdnHdlr-";
private static final int DEFAULT_BUFFER_SIZE = 8 * 1024;
private RequestListenerThread listenerThread;
- private final Object lock = new Object();
+ private final Lock lock = new ReentrantLock();
public AS2AsyncMDNServerConnection(Integer portNumber, SSLContext
sslContext)
throws IOException {
@@ -66,7 +68,8 @@ public class AS2AsyncMDNServerConnection {
public void close() {
if (listenerThread != null) {
- synchronized (lock) {
+ lock.lock();
+ try {
try {
listenerThread.serverSocket.close();
} catch (IOException e) {
@@ -74,14 +77,19 @@ public class AS2AsyncMDNServerConnection {
} finally {
listenerThread = null;
}
+ } finally {
+ lock.unlock();
}
}
}
public void receive(String requestUriPattern, HttpRequestHandler handler) {
if (listenerThread != null) {
- synchronized (lock) {
+ lock.lock();
+ try {
listenerThread.registerHandler(requestUriPattern, handler);
+ } finally {
+ lock.unlock();
}
}
}
diff --git
a/components/camel-as2/camel-as2-api/src/main/java/org/apache/camel/component/as2/api/AS2ServerConnection.java
b/components/camel-as2/camel-as2-api/src/main/java/org/apache/camel/component/as2/api/AS2ServerConnection.java
index 930d0dfc05b..08976e9b122 100644
---
a/components/camel-as2/camel-as2-api/src/main/java/org/apache/camel/component/as2/api/AS2ServerConnection.java
+++
b/components/camel-as2/camel-as2-api/src/main/java/org/apache/camel/component/as2/api/AS2ServerConnection.java
@@ -23,6 +23,8 @@ import java.net.Socket;
import java.net.SocketException;
import java.security.PrivateKey;
import java.security.cert.Certificate;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLServerSocketFactory;
@@ -213,7 +215,7 @@ public class AS2ServerConnection {
}
private RequestListenerThread listenerThread;
- private final Object lock = new Object();
+ private final Lock lock = new ReentrantLock();
private final String as2Version;
private final String originServer;
private final String serverFqdn;
@@ -267,7 +269,8 @@ public class AS2ServerConnection {
public void close() {
if (listenerThread != null) {
- synchronized (lock) {
+ lock.lock();
+ try {
try {
listenerThread.serversocket.close();
} catch (IOException e) {
@@ -275,14 +278,19 @@ public class AS2ServerConnection {
} finally {
listenerThread = null;
}
+ } finally {
+ lock.unlock();
}
}
}
public void listen(String requestUri, HttpRequestHandler handler) {
if (listenerThread != null) {
- synchronized (lock) {
+ lock.lock();
+ try {
listenerThread.registerHandler(requestUri, handler);
+ } finally {
+ lock.unlock();
}
}
}
diff --git
a/components/camel-as2/camel-as2-component/src/main/java/org/apache/camel/component/as2/internal/AS2ConnectionHelper.java
b/components/camel-as2/camel-as2-component/src/main/java/org/apache/camel/component/as2/internal/AS2ConnectionHelper.java
index f19f81e6b53..c43297fa924 100644
---
a/components/camel-as2/camel-as2-component/src/main/java/org/apache/camel/component/as2/internal/AS2ConnectionHelper.java
+++
b/components/camel-as2/camel-as2-component/src/main/java/org/apache/camel/component/as2/internal/AS2ConnectionHelper.java
@@ -17,8 +17,9 @@
package org.apache.camel.component.as2.internal;
import java.io.IOException;
-import java.util.HashMap;
+import java.io.UncheckedIOException;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.camel.component.as2.AS2Configuration;
import org.apache.camel.component.as2.api.AS2AsyncMDNServerConnection;
@@ -34,9 +35,9 @@ public final class AS2ConnectionHelper {
private static final Logger LOG =
LoggerFactory.getLogger(AS2ConnectionHelper.class);
- private static Map<Integer, AS2ServerConnection> serverConnections = new
HashMap<>();
+ private static final Map<Integer, AS2ServerConnection> serverConnections =
new ConcurrentHashMap<>();
- private static Map<Integer, AS2AsyncMDNServerConnection>
asyncMdnServerConnections = new HashMap<>();
+ private static final Map<Integer, AS2AsyncMDNServerConnection>
asyncMdnServerConnections = new ConcurrentHashMap<>();
/**
* Prevent instantiation
@@ -69,17 +70,20 @@ public final class AS2ConnectionHelper {
*/
public static AS2AsyncMDNServerConnection
createAS2AsyncMDNServerConnection(AS2Configuration configuration)
throws IOException {
- AS2AsyncMDNServerConnection asyncMdnServerConnection
- =
asyncMdnServerConnections.get(configuration.getAsyncMdnPortNumber());
- synchronized (asyncMdnServerConnections) {
- if (asyncMdnServerConnection == null) {
- asyncMdnServerConnection
- = new AS2AsyncMDNServerConnection(
- configuration.getAsyncMdnPortNumber(),
configuration.getSslContext());
-
asyncMdnServerConnections.put(configuration.getAsyncMdnPortNumber(),
asyncMdnServerConnection);
- }
+ try {
+ return asyncMdnServerConnections.computeIfAbsent(
+ configuration.getAsyncMdnPortNumber(),
+ key -> {
+ try {
+ return new AS2AsyncMDNServerConnection(
+ configuration.getAsyncMdnPortNumber(),
configuration.getSslContext());
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ } catch (UncheckedIOException e) {
+ throw e.getCause();
}
- return asyncMdnServerConnection;
}
/**
@@ -90,18 +94,24 @@ public final class AS2ConnectionHelper {
* @throws IOException
*/
public static AS2ServerConnection
createAS2ServerConnection(AS2Configuration configuration) throws IOException {
- synchronized (serverConnections) {
- AS2ServerConnection serverConnection =
serverConnections.get(configuration.getServerPortNumber());
- if (serverConnection == null) {
- serverConnection = new AS2ServerConnection(
- configuration.getAs2Version(),
configuration.getServer(),
- configuration.getServerFqdn(),
configuration.getServerPortNumber(), configuration.getSigningAlgorithm(),
- configuration.getSigningCertificateChain(),
configuration.getSigningPrivateKey(),
- configuration.getDecryptingPrivateKey(),
configuration.getMdnMessageTemplate(),
- configuration.getValidateSigningCertificateChain(),
configuration.getSslContext());
- serverConnections.put(configuration.getServerPortNumber(),
serverConnection);
- }
- return serverConnection;
+ try {
+ return serverConnections.computeIfAbsent(
+ configuration.getServerPortNumber(),
+ key -> {
+ try {
+ return new AS2ServerConnection(
+ configuration.getAs2Version(),
configuration.getServer(),
+ configuration.getServerFqdn(),
configuration.getServerPortNumber(),
+ configuration.getSigningAlgorithm(),
+
configuration.getSigningCertificateChain(),
configuration.getSigningPrivateKey(),
+ configuration.getDecryptingPrivateKey(),
configuration.getMdnMessageTemplate(),
+
configuration.getValidateSigningCertificateChain(),
configuration.getSslContext());
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ } catch (UncheckedIOException e) {
+ throw e.getCause();
}
}
@@ -111,34 +121,30 @@ public final class AS2ConnectionHelper {
}
public static void closeAllServerConnections() {
- synchronized (serverConnections) {
- for (Map.Entry<Integer, AS2ServerConnection> entry :
serverConnections.entrySet()) {
- try {
- int port = entry.getKey();
- LOG.debug("Stopping and closing AS2ServerConnection on
port: {}", port);
- AS2ServerConnection conn = entry.getValue();
- conn.close();
- } catch (Exception e) {
- // ignore
- LOG.debug("Error stopping and closing AS2ServerConnection
due to {}. This exception is ignored",
- e.getMessage(), e);
- }
+ for (Map.Entry<Integer, AS2ServerConnection> entry :
serverConnections.entrySet()) {
+ try {
+ int port = entry.getKey();
+ LOG.debug("Stopping and closing AS2ServerConnection on port:
{}", port);
+ AS2ServerConnection conn = entry.getValue();
+ conn.close();
+ } catch (Exception e) {
+ // ignore
+ LOG.debug("Error stopping and closing AS2ServerConnection due
to {}. This exception is ignored",
+ e.getMessage(), e);
}
}
serverConnections.clear();
}
public static void closeAllAsyncMdnServerConnections() {
- synchronized (asyncMdnServerConnections) {
- for (Map.Entry<Integer, AS2AsyncMDNServerConnection> entry :
asyncMdnServerConnections.entrySet()) {
- try {
- int port = entry.getKey();
- LOG.debug("Stopping and closing AsyncMdnServerConnection
on port: {}", port);
- entry.getValue().close();
- } catch (Exception e) {
- LOG.debug("Error stopping and closing
AsyncMdnServerConnection due to {}. This exception is ignored",
- e.getMessage(), e);
- }
+ for (Map.Entry<Integer, AS2AsyncMDNServerConnection> entry :
asyncMdnServerConnections.entrySet()) {
+ try {
+ int port = entry.getKey();
+ LOG.debug("Stopping and closing AsyncMdnServerConnection on
port: {}", port);
+ entry.getValue().close();
+ } catch (Exception e) {
+ LOG.debug("Error stopping and closing AsyncMdnServerConnection
due to {}. This exception is ignored",
+ e.getMessage(), e);
}
}
asyncMdnServerConnections.clear();
diff --git
a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketComponent.java
b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketComponent.java
index 7336776a45c..5d48a1067f4 100644
---
a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketComponent.java
+++
b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketComponent.java
@@ -17,8 +17,8 @@
package org.apache.camel.component.atmosphere.websocket;
import java.net.URI;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.camel.component.servlet.ServletComponent;
import org.apache.camel.component.servlet.ServletEndpoint;
@@ -29,14 +29,14 @@ import org.apache.camel.spi.annotations.Component;
*/
@Component("atmosphere-websocket")
public class WebsocketComponent extends ServletComponent {
- private Map<String, WebSocketStore> stores;
+ private final Map<String, WebSocketStore> stores;
public WebsocketComponent() {
// override the default servlet name of ServletComponent
super(WebsocketEndpoint.class);
setServletName("CamelWsServlet");
- this.stores = new HashMap<>();
+ this.stores = new ConcurrentHashMap<>();
}
@Override
@@ -46,10 +46,6 @@ public class WebsocketComponent extends ServletComponent {
}
WebSocketStore getWebSocketStore(String name) {
- WebSocketStore store;
- synchronized (stores) {
- store = stores.computeIfAbsent(name, k -> new
MemoryWebSocketStore());
- }
- return store;
+ return stores.computeIfAbsent(name, k -> new MemoryWebSocketStore());
}
}
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index 2b94b9da034..c33cedab3ba 100644
---
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -16,11 +16,7 @@
*/
package org.apache.camel.component.aws2.kinesis;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
+import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -56,9 +52,9 @@ public class Kinesis2Consumer extends
ScheduledBatchPollingConsumer implements R
private KinesisConnection connection;
private ResumeStrategy resumeStrategy;
- private Map<String, String> currentShardIterators = new
java.util.HashMap<>();
+ private final Map<String, String> currentShardIterators = new
java.util.HashMap<>();
- private List<Shard> currentShardList = new ArrayList<>();
+ private volatile List<Shard> currentShardList = List.of();
private static final String SHARD_MONITOR_EXECUTOR_NAME =
"Kinesis_shard_monitor";
private ScheduledExecutorService shardMonitorExecutor;
@@ -384,12 +380,12 @@ public class Kinesis2Consumer extends
ScheduledBatchPollingConsumer implements R
return getEndpoint().getConfiguration();
}
- protected synchronized List<Shard> getCurrentShardList() {
+ protected List<Shard> getCurrentShardList() {
return this.currentShardList;
}
- private synchronized void setCurrentShardList(List<Shard> latestShardList)
{
- this.currentShardList = latestShardList;
+ private void setCurrentShardList(List<Shard> latestShardList) {
+ this.currentShardList = List.copyOf(latestShardList);
}
private class ShardMonitor implements Runnable {
diff --git
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
index 8d23db6f1a5..57c8459fb9c 100644
---
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
+++
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
@@ -25,6 +25,8 @@ import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
@@ -63,7 +65,7 @@ public class AWS2S3StreamUploadProducer extends
DefaultProducer {
private static final String TIMEOUT_CHECKER_EXECUTOR_NAME =
"S3_Streaming_Upload_Timeout_Checker";
private AtomicInteger part = new AtomicInteger();
private UploadState uploadAggregate = null;
- private final Object lock = new Object();
+ private final Lock lock = new ReentrantLock();
private transient String s3ProducerToString;
private ScheduledExecutorService timeoutCheckerExecutorService;
@@ -89,11 +91,14 @@ public class AWS2S3StreamUploadProducer extends
DefaultProducer {
@Override
protected void doStop() throws Exception {
- synchronized (lock) {
+ lock.lock();
+ try {
if (ObjectHelper.isNotEmpty(uploadAggregate)) {
uploadPart(uploadAggregate);
completeUpload(uploadAggregate);
}
+ } finally {
+ lock.unlock();
}
if (timeoutCheckerExecutorService != null) {
getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(timeoutCheckerExecutorService);
@@ -110,12 +115,15 @@ public class AWS2S3StreamUploadProducer extends
DefaultProducer {
@Override
public void run() {
- synchronized (lock) {
+ lock.lock();
+ try {
if (ObjectHelper.isNotEmpty(uploadAggregate)) {
uploadPart(uploadAggregate);
completeUpload(uploadAggregate);
uploadAggregate = null;
}
+ } finally {
+ lock.unlock();
}
}
}
@@ -139,7 +147,8 @@ public class AWS2S3StreamUploadProducer extends
DefaultProducer {
totalSize += b.length;
if (getConfiguration().isMultiPartUpload())
maxRead -= b.length;
- synchronized (lock) {
+ lock.lock();
+ try {
// aggregate with previously received exchanges
if (ObjectHelper.isNotEmpty(uploadAggregate)) {
uploadAggregate.buffer.write(b);
@@ -165,6 +174,8 @@ public class AWS2S3StreamUploadProducer extends
DefaultProducer {
}
continue;
}
+ } finally {
+ lock.unlock();
}
if (state == null) {
state = new UploadState();
@@ -242,13 +253,16 @@ public class AWS2S3StreamUploadProducer extends
DefaultProducer {
if (ObjectHelper.isNotEmpty(state)) {
// exchange wasn't large enough to send, batch it with subsequent
exchanges.
- synchronized (lock) {
+ lock.lock();
+ try {
if (ObjectHelper.isEmpty(this.uploadAggregate)) {
this.uploadAggregate = state;
} else {
// handle potential race condition.
this.uploadAggregate.buffer.write(state.buffer.toByteArray());
}
+ } finally {
+ lock.unlock();
}
}
}
diff --git
a/components/camel-bean/src/main/java/org/apache/camel/component/bean/AbstractBeanProcessor.java
b/components/camel-bean/src/main/java/org/apache/camel/component/bean/AbstractBeanProcessor.java
index ff287aeba53..6f184389ea5 100644
---
a/components/camel-bean/src/main/java/org/apache/camel/component/bean/AbstractBeanProcessor.java
+++
b/components/camel-bean/src/main/java/org/apache/camel/component/bean/AbstractBeanProcessor.java
@@ -16,6 +16,9 @@
*/
package org.apache.camel.component.bean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.camel.AsyncCallback;
import org.apache.camel.BeanScope;
import org.apache.camel.Exchange;
@@ -38,7 +41,7 @@ public abstract class AbstractBeanProcessor extends
AsyncProcessorSupport {
private transient Processor processor;
private transient Object bean;
private transient boolean lookupProcessorDone;
- private final Object lock = new Object();
+ private final Lock lock = new ReentrantLock();
private BeanScope scope;
private String method;
private boolean shorthandMethod;
@@ -130,11 +133,14 @@ public abstract class AbstractBeanProcessor extends
AsyncProcessorSupport {
boolean allowCache = scope == null || scope == BeanScope.Singleton;
if (allowCache) {
if (!lookupProcessorDone) {
- synchronized (lock) {
+ lock.lock();
+ try {
lookupProcessorDone = true;
// so if there is a custom type converter for the bean
to processor
target =
exchange.getContext().getTypeConverter().tryConvertTo(Processor.class,
exchange, beanTmp);
processor = target;
+ } finally {
+ lock.unlock();
}
}
} else {
diff --git
a/components/camel-bean/src/main/java/org/apache/camel/component/bean/MethodInfoCache.java
b/components/camel-bean/src/main/java/org/apache/camel/component/bean/MethodInfoCache.java
index 896204b1121..3d52bebae7d 100644
---
a/components/camel-bean/src/main/java/org/apache/camel/component/bean/MethodInfoCache.java
+++
b/components/camel-bean/src/main/java/org/apache/camel/component/bean/MethodInfoCache.java
@@ -45,13 +45,8 @@ public class MethodInfoCache {
this.methodCache = methodCache;
}
- public synchronized MethodInfo getMethodInfo(Method method) {
- MethodInfo answer = methodCache.get(method);
- if (answer == null) {
- answer = createMethodInfo(method);
- methodCache.put(method, answer);
- }
- return answer;
+ public MethodInfo getMethodInfo(Method method) {
+ return methodCache.computeIfAbsent(method, this::createMethodInfo);
}
protected MethodInfo createMethodInfo(Method method) {
@@ -60,13 +55,8 @@ public class MethodInfoCache {
return info.getMethodInfo(method);
}
- protected synchronized BeanInfo getBeanInfo(Class<?> declaringClass) {
- BeanInfo beanInfo = classCache.get(declaringClass);
- if (beanInfo == null) {
- beanInfo = createBeanInfo(declaringClass);
- classCache.put(declaringClass, beanInfo);
- }
- return beanInfo;
+ protected BeanInfo getBeanInfo(Class<?> declaringClass) {
+ return classCache.computeIfAbsent(declaringClass,
this::createBeanInfo);
}
protected BeanInfo createBeanInfo(Class<?> declaringClass) {
diff --git
a/components/camel-braintree/src/main/java/org/apache/camel/component/braintree/BraintreeComponent.java
b/components/camel-braintree/src/main/java/org/apache/camel/component/braintree/BraintreeComponent.java
index 509897b7700..09d77fb1df4 100644
---
a/components/camel-braintree/src/main/java/org/apache/camel/component/braintree/BraintreeComponent.java
+++
b/components/camel-braintree/src/main/java/org/apache/camel/component/braintree/BraintreeComponent.java
@@ -16,8 +16,8 @@
*/
package org.apache.camel.component.braintree;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import com.braintreegateway.BraintreeGateway;
import org.apache.camel.CamelContext;
@@ -41,12 +41,12 @@ public class BraintreeComponent extends
AbstractApiComponent<BraintreeApiName, B
public BraintreeComponent() {
super(BraintreeApiName.class, BraintreeApiCollection.getCollection());
- this.gateways = new HashMap<>();
+ this.gateways = new ConcurrentHashMap<>();
}
public BraintreeComponent(CamelContext context) {
super(context, BraintreeApiName.class,
BraintreeApiCollection.getCollection());
- this.gateways = new HashMap<>();
+ this.gateways = new ConcurrentHashMap<>();
}
@Override
@@ -63,20 +63,12 @@ public class BraintreeComponent extends
AbstractApiComponent<BraintreeApiName, B
return new BraintreeEndpoint(uri, this, apiName, methodName,
endpointConfiguration);
}
- public synchronized BraintreeGateway getGateway(BraintreeConfiguration
configuration) {
+ public BraintreeGateway getGateway(BraintreeConfiguration configuration) {
BraintreeGateway gateway;
if (configuration.getAccessToken() != null) {
- gateway = gateways.get(configuration.getAccessToken());
- if (gateway == null) {
- gateway = configuration.newBraintreeGateway();
- gateways.put(configuration.getAccessToken(), gateway);
- }
+ gateway = gateways.computeIfAbsent(configuration.getAccessToken(),
k -> configuration.newBraintreeGateway());
} else {
- gateway = gateways.get(configuration.getMerchantId());
- if (gateway == null) {
- gateway = configuration.newBraintreeGateway();
- gateways.put(configuration.getMerchantId(), gateway);
- }
+ gateway = gateways.computeIfAbsent(configuration.getMerchantId(),
k -> configuration.newBraintreeGateway());
}
return gateway;
}
diff --git
a/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdComponent.java
b/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdComponent.java
index 2f912d78bc5..6aa663fd688 100644
---
a/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdComponent.java
+++
b/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdComponent.java
@@ -22,6 +22,8 @@ import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import jakarta.servlet.DispatcherType;
@@ -56,6 +58,7 @@ import org.slf4j.LoggerFactory;
public class CometdComponent extends DefaultComponent implements
SSLContextParametersAware {
private static final Logger LOG =
LoggerFactory.getLogger(CometdComponent.class);
+ private final Lock connectorsLock = new ReentrantLock();
private final Map<String, ConnectorRef> connectors = new LinkedHashMap<>();
private List<BayeuxServer.BayeuxServerListener> serverListeners;
@@ -116,7 +119,8 @@ public class CometdComponent extends DefaultComponent
implements SSLContextParam
CometdEndpoint endpoint = prodcon.getEndpoint();
String connectorKey = endpoint.getProtocol() + ":" +
endpoint.getUri().getHost() + ":" + endpoint.getPort();
- synchronized (connectors) {
+ connectorsLock.lock();
+ try {
ConnectorRef connectorRef = connectors.get(connectorKey);
if (connectorRef == null) {
ServerConnector connector;
@@ -160,6 +164,8 @@ public class CometdComponent extends DefaultComponent
implements SSLContextParam
}
}
prodcon.setBayeux(bayeux);
+ } finally {
+ connectorsLock.unlock();
}
}
@@ -171,16 +177,17 @@ public class CometdComponent extends DefaultComponent
implements SSLContextParam
String connectorKey = endpoint.getProtocol() + ":" +
endpoint.getUri().getHost() + ":" + endpoint.getPort();
- synchronized (connectors) {
+ connectorsLock.lock();
+ try {
ConnectorRef connectorRef = connectors.get(connectorKey);
- if (connectorRef != null) {
- if (connectorRef.decrement() == 0) {
-
connectorRef.server.removeConnector(connectorRef.connector);
- connectorRef.connector.stop();
- connectorRef.server.stop();
- connectors.remove(connectorKey);
- }
+ if (connectorRef != null && connectorRef.decrement() == 0) {
+ connectorRef.server.removeConnector(connectorRef.connector);
+ connectorRef.connector.stop();
+ connectorRef.server.stop();
+ connectors.remove(connectorKey);
}
+ } finally {
+ connectorsLock.unlock();
}
}
@@ -352,10 +359,15 @@ public class CometdComponent extends DefaultComponent
implements SSLContextParam
@Override
protected void doStop() throws Exception {
- for (ConnectorRef connectorRef : connectors.values()) {
- connectorRef.connector.stop();
+ connectorsLock.lock();
+ try {
+ for (ConnectorRef connectorRef : connectors.values()) {
+ connectorRef.connector.stop();
+ }
+ connectors.clear();
+ } finally {
+ connectorsLock.unlock();
}
- connectors.clear();
super.doStop();
}
diff --git
a/components/camel-consul/src/main/java/org/apache/camel/component/consul/cluster/ConsulClusterView.java
b/components/camel-consul/src/main/java/org/apache/camel/component/consul/cluster/ConsulClusterView.java
index eea1972290f..cfa1fae71f4 100644
---
a/components/camel-consul/src/main/java/org/apache/camel/component/consul/cluster/ConsulClusterView.java
+++
b/components/camel-consul/src/main/java/org/apache/camel/component/consul/cluster/ConsulClusterView.java
@@ -22,6 +22,8 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.camel.cluster.CamelClusterMember;
@@ -44,6 +46,7 @@ final class ConsulClusterView extends
AbstractCamelClusterView {
private final ConsulClusterConfiguration configuration;
private final ConsulLocalMember localMember;
+ private final Lock sessionIdLock = new ReentrantLock();
private final AtomicReference<String> sessionId;
private final Watcher watcher;
@@ -113,21 +116,25 @@ final class ConsulClusterView extends
AbstractCamelClusterView {
if (keyValueClient.releaseLock(this.path, sessionId.get())) {
LOGGER.debug("Successfully released lock on path '{}' with id
'{}'", path, sessionId.get());
}
-
- synchronized (sessionId) {
+ sessionIdLock.lock();
+ try {
sessionClient.destroySession(sessionId.getAndSet(null));
localMember.setMaster(false);
+ } finally {
+ sessionIdLock.unlock();
}
}
}
private boolean acquireLock() {
- synchronized (sessionId) {
+ sessionIdLock.lock();
+ try {
String sid = sessionId.get();
return (sid != null)
- ? sessionClient.getSessionInfo(sid).map(si ->
keyValueClient.acquireLock(path, sid)).orElse(Boolean.FALSE)
- : false;
+ && sessionClient.getSessionInfo(sid).map(si ->
keyValueClient.acquireLock(path, sid)).orElse(Boolean.FALSE);
+ } finally {
+ sessionIdLock.unlock();
}
}
@@ -136,7 +143,7 @@ final class ConsulClusterView extends
AbstractCamelClusterView {
// ***********************************************
private final class ConsulLocalMember implements CamelClusterMember {
- private AtomicBoolean master = new AtomicBoolean();
+ private final AtomicBoolean master = new AtomicBoolean();
void setMaster(boolean master) {
if (master && this.master.compareAndSet(false, true)) {
@@ -147,7 +154,6 @@ final class ConsulClusterView extends
AbstractCamelClusterView {
if (!master && this.master.compareAndSet(true, false)) {
LOGGER.debug("Leadership lost for session id {}",
sessionId.get());
fireLeadershipChangedEvent(getLeader().orElse(null));
- return;
}
}
diff --git
a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
index 8b252244f4c..ed171b5fffc 100644
---
a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
+++
b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
@@ -16,6 +16,9 @@
*/
package org.apache.camel.component.couchbase;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.JsonNode;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Collection;
@@ -43,6 +46,7 @@ public class CouchbaseConsumer extends
DefaultScheduledPollConsumer implements R
private static final Logger LOG =
LoggerFactory.getLogger(CouchbaseConsumer.class);
+ private final Lock lock = new ReentrantLock();
private final CouchbaseEndpoint endpoint;
private final Bucket bucket;
private final Collection collection;
@@ -109,64 +113,69 @@ public class CouchbaseConsumer extends
DefaultScheduledPollConsumer implements R
}
@Override
- protected synchronized int poll() throws Exception {
- ViewResult result = bucket.viewQuery(endpoint.getDesignDocumentName(),
endpoint.getViewName(), this.viewOptions);
-
- // okay we have some response from CouchBase so lets mark the consumer
as ready
- forceConsumerAsReady();
+ protected int poll() throws Exception {
+ lock.lock();
+ try {
+ ViewResult result =
bucket.viewQuery(endpoint.getDesignDocumentName(), endpoint.getViewName(),
this.viewOptions);
- if (LOG.isTraceEnabled()) {
- LOG.trace("ViewResponse = {}", result);
- }
+ // okay we have some response from CouchBase so lets mark the
consumer as ready
+ forceConsumerAsReady();
- String consumerProcessedStrategy =
endpoint.getConsumerProcessedStrategy();
- for (ViewRow row : result.rows()) {
- Object doc;
- String id = row.id().get();
- if (endpoint.isFullDocument()) {
- doc = CouchbaseCollectionOperation.getDocument(collection, id,
endpoint.getQueryTimeout());
- } else {
- doc = row.valueAs(Object.class);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("ViewResponse = {}", result);
}
- String key = row.keyAs(JsonNode.class).get().asText();
- String designDocumentName = endpoint.getDesignDocumentName();
- String viewName = endpoint.getViewName();
-
- Exchange exchange = createExchange(false);
- try {
- exchange.getIn().setBody(doc);
- exchange.getIn().setHeader(HEADER_ID, id);
- exchange.getIn().setHeader(HEADER_KEY, key);
- exchange.getIn().setHeader(HEADER_DESIGN_DOCUMENT_NAME,
designDocumentName);
- exchange.getIn().setHeader(HEADER_VIEWNAME, viewName);
-
- if ("delete".equalsIgnoreCase(consumerProcessedStrategy)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Deleting doc with ID {}", id);
- }
- CouchbaseCollectionOperation.removeDocument(collection,
id, endpoint.getWriteQueryTimeout(),
- endpoint.getProducerRetryPause());
- } else if
("filter".equalsIgnoreCase(consumerProcessedStrategy)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Filtering out ID {}", id);
- }
- // add filter for already processed docs
+ String consumerProcessedStrategy =
endpoint.getConsumerProcessedStrategy();
+ for (ViewRow row : result.rows()) {
+ Object doc;
+ String id = row.id().get();
+ if (endpoint.isFullDocument()) {
+ doc = CouchbaseCollectionOperation.getDocument(collection,
id, endpoint.getQueryTimeout());
} else {
- LOG.trace("No strategy set for already processed docs,
beware of duplicates!");
+ doc = row.valueAs(Object.class);
}
- logDetails(id, doc, key, designDocumentName, viewName,
exchange);
+ String key = row.keyAs(JsonNode.class).get().asText();
+ String designDocumentName = endpoint.getDesignDocumentName();
+ String viewName = endpoint.getViewName();
+
+ Exchange exchange = createExchange(false);
+ try {
+ exchange.getIn().setBody(doc);
+ exchange.getIn().setHeader(HEADER_ID, id);
+ exchange.getIn().setHeader(HEADER_KEY, key);
+ exchange.getIn().setHeader(HEADER_DESIGN_DOCUMENT_NAME,
designDocumentName);
+ exchange.getIn().setHeader(HEADER_VIEWNAME, viewName);
+
+ if ("delete".equalsIgnoreCase(consumerProcessedStrategy)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Deleting doc with ID {}", id);
+ }
+
CouchbaseCollectionOperation.removeDocument(collection, id,
endpoint.getWriteQueryTimeout(),
+ endpoint.getProducerRetryPause());
+ } else if
("filter".equalsIgnoreCase(consumerProcessedStrategy)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Filtering out ID {}", id);
+ }
+ // add filter for already processed docs
+ } else {
+ LOG.trace("No strategy set for already processed docs,
beware of duplicates!");
+ }
+
+ logDetails(id, doc, key, designDocumentName, viewName,
exchange);
- getProcessor().process(exchange);
- } catch (Exception e) {
- this.getExceptionHandler().handleException("Error processing
exchange.", exchange, e);
- } finally {
- releaseExchange(exchange, false);
+ getProcessor().process(exchange);
+ } catch (Exception e) {
+ this.getExceptionHandler().handleException("Error
processing exchange.", exchange, e);
+ } finally {
+ releaseExchange(exchange, false);
+ }
}
- }
- return result.rows().size();
+ return result.rows().size();
+ } finally {
+ lock.unlock();
+ }
}
private void logDetails(String id, Object doc, String key, String
designDocumentName, String viewName, Exchange exchange) {
diff --git
a/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvMarshalHeaderWithCustomMarshallFactoryTest.java
b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvMarshalHeaderWithCustomMarshallFactoryTest.java
index eb16e14892d..6033d150f1d 100644
---
a/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvMarshalHeaderWithCustomMarshallFactoryTest.java
+++
b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvMarshalHeaderWithCustomMarshallFactoryTest.java
@@ -27,6 +27,8 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
import org.apache.camel.Exchange;
@@ -115,6 +117,7 @@ public class CsvMarshalHeaderWithCustomMarshallFactoryTest
extends CamelTestSupp
private static final class SinglePrinterCsvMarshaller extends
CsvMarshaller {
+ private final Lock lock = new ReentrantLock();
private final CSVPrinter printer;
private SinglePrinterCsvMarshaller(CSVFormat format) {
@@ -134,7 +137,8 @@ public class CsvMarshalHeaderWithCustomMarshallFactoryTest
extends CamelTestSupp
@Override
@SuppressWarnings("unchecked")
public void marshal(Exchange exchange, Object object, OutputStream
outputStream) throws IOException {
- synchronized (printer) {
+ lock.lock();
+ try {
if (object instanceof Map) {
Map map = (Map) object;
printer.printRecord(getMapRecordValues(map));
@@ -149,6 +153,8 @@ public class CsvMarshalHeaderWithCustomMarshallFactoryTest
extends CamelTestSupp
outputStream.write(stringBuilder.toString().getBytes());
// Reset the 'Appendable' for the next exchange.
stringBuilder.setLength(0);
+ } finally {
+ lock.unlock();
}
}
diff --git
a/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
b/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
index 809bd7c8801..d03dfcee08f 100644
---
a/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
+++
b/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
@@ -879,7 +879,7 @@ public class CxfRsProducer extends DefaultAsyncProducer {
* Cache contains {@link
org.apache.cxf.jaxrs.client.JAXRSClientFactoryBean}
*/
class ClientFactoryBeanCache {
- private Map<String, JAXRSClientFactoryBean> cache;
+ private final Map<String, JAXRSClientFactoryBean> cache;
ClientFactoryBeanCache(final int maxCacheSize) {
this.cache = LRUCacheFactory.newLRUSoftCache(maxCacheSize);
@@ -896,22 +896,16 @@ public class CxfRsProducer extends DefaultAsyncProducer {
}
public JAXRSClientFactoryBean get(String address) {
- JAXRSClientFactoryBean retVal = null;
- synchronized (cache) {
- retVal = cache.get(address);
-
- if (retVal == null) {
- retVal = ((CxfRsEndpoint)
getEndpoint()).createJAXRSClientFactoryBean(address);
-
- cache.put(address, retVal);
-
- LOG.trace("Created client factory bean and add to cache
for address '{}'", address);
-
- } else {
- LOG.trace("Retrieved client factory bean from cache for
address '{}'", address);
- }
- }
- return retVal;
+ return cache.compute(address,
+ (key, value) -> {
+ if (value == null) {
+ value = ((CxfRsEndpoint)
getEndpoint()).createJAXRSClientFactoryBean(address);
+ LOG.trace("Created client factory bean and add to
cache for address '{}'", address);
+ } else {
+ LOG.trace("Retrieved client factory bean from
cache for address '{}'", address);
+ }
+ return value;
+ });
}
}
}