This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 67ea9d64 Allow user to disable TLS (#356)
67ea9d64 is described below
commit 67ea9d64e58edac4f2abf3ab62deb8f02e2aa180
Author: Aaron Ai <[email protected]>
AuthorDate: Mon Feb 13 12:14:34 2023 +0800
Allow user to disable TLS (#356)
---
.../rocketmq/client/apis/ClientConfiguration.java | 8 ++++-
.../client/apis/ClientConfigurationBuilder.java | 14 +++++++-
.../apache/rocketmq/client/java/impl/Client.java | 38 +++++++++++++++++-----
.../rocketmq/client/java/impl/ClientImpl.java | 5 +++
.../client/java/impl/ClientManagerImpl.java | 8 ++---
.../client/java/metrics/ClientMeterManager.java | 15 +++++++--
.../rocketmq/client/java/rpc/RpcClientImpl.java | 19 +++++++----
7 files changed, 82 insertions(+), 25 deletions(-)
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
index 5b0f0041..27148103 100644
---
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
@@ -27,16 +27,18 @@ public class ClientConfiguration {
private final String endpoints;
private final SessionCredentialsProvider sessionCredentialsProvider;
private final Duration requestTimeout;
+ private final boolean sslEnabled;
/**
* The caller is supposed to have validated the arguments and handled
throwing exceptions or
* logging warnings already, so we avoid repeating args check here.
*/
ClientConfiguration(String endpoints, SessionCredentialsProvider
sessionCredentialsProvider,
- Duration requestTimeout) {
+ Duration requestTimeout, boolean sslEnabled) {
this.endpoints = endpoints;
this.sessionCredentialsProvider = sessionCredentialsProvider;
this.requestTimeout = requestTimeout;
+ this.sslEnabled = sslEnabled;
}
public static ClientConfigurationBuilder newBuilder() {
@@ -54,4 +56,8 @@ public class ClientConfiguration {
public Duration getRequestTimeout() {
return requestTimeout;
}
+
+ public boolean isSslEnabled() {
+ return sslEnabled;
+ }
}
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
index 5767beac..eb40c88c 100644
---
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
@@ -30,6 +30,7 @@ public class ClientConfigurationBuilder {
private String endpoints;
private SessionCredentialsProvider sessionCredentialsProvider = null;
private Duration requestTimeout = Duration.ofSeconds(3);
+ private boolean sslEnabled = true;
/**
* Configure the access point with which the SDK should communicate.
@@ -70,6 +71,17 @@ public class ClientConfigurationBuilder {
return this;
}
+ /**
+ * Enable or disable the use of Secure Sockets Layer (SSL) for network
transport.
+ *
+ * @param sslEnabled A boolean value indicating whether SSL should be
enabled or not.
+ * @return The {@link ClientConfigurationBuilder} instance, to allow for
method chaining.
+ */
+ public ClientConfigurationBuilder enableSsl(boolean sslEnabled) {
+ this.sslEnabled = sslEnabled;
+ return this;
+ }
+
/**
* Finalize the build of {@link ClientConfiguration}.
*
@@ -78,6 +90,6 @@ public class ClientConfigurationBuilder {
public ClientConfiguration build() {
checkNotNull(endpoints, "endpoints should not be null");
checkNotNull(requestTimeout, "requestTimeout should not be null");
- return new ClientConfiguration(endpoints, sessionCredentialsProvider,
requestTimeout);
+ return new ClientConfiguration(endpoints, sessionCredentialsProvider,
requestTimeout, sslEnabled);
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
index edab8765..4de14ffa 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
@@ -23,34 +23,56 @@ import org.apache.rocketmq.client.java.route.Endpoints;
public interface Client {
/**
- * @return endpoints.
+ * Retrieve Endpoints Information
+ *
+ * @return the endpoints associated with this client.
*/
Endpoints getEndpoints();
/**
- * Get the unique client identifier for each client.
+ * Get Unique Client Identifier
+ *
+ * <p>Get the unique client identifier for each client.
*
- * @return unique client identifier.
+ * @return a unique client identifier.
*/
ClientId getClientId();
/**
- * @return signature for tls
+ * Get TLS Signature
+ *
+ * @return the signature for TLS (Transport Layer Security).
+ * @throws Exception if an error occurs during the signature generation
process.
*/
Metadata sign() throws Exception;
/**
- * Send heart beat to remote {@link Endpoints}.
+ * Check SSL Status
+ *
+ * <p>Check if SSL (Secure Sockets Layer) is enabled.
+ *
+ * @return a boolean value indicating whether SSL is enabled or not.
+ */
+ boolean isSslEnabled();
+
+ /**
+ * Send Heartbeat
+ *
+ * <p> Send a heartbeat to the remote endpoint.
*/
void doHeartbeat();
/**
- * Sync settings to remote.
+ * Sync Settings
+ *
+ * <p>Synchronize client settings with the remote endpoint.
*/
void syncSettings();
/**
- * Do some stats for client.
+ * Do Statistics
+ *
+ * <p>Perform some statistics for the client.
*/
void doStats();
-}
+}
\ No newline at end of file
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index c475bfc0..846f0ce9 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -531,6 +531,11 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
return Signature.sign(clientConfiguration, clientId);
}
+ @Override
+ public boolean isSslEnabled() {
+ return clientConfiguration.isSslEnabled();
+ }
+
/**
* Send heartbeat data to the appointed endpoint
*
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
index 99867b98..7ab2f9b6 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
@@ -155,10 +155,8 @@ public class ClientManagerImpl extends ClientManager {
}
/**
- * Return the RPC client by remote {@link Endpoints}, would create the
client automatically if it does not exist.
- *
- * <p>In case of the occasion that {@link RpcClient} is garbage collected
before shutdown when invoked
- * concurrently, lock here is essential.
+ * Obtain the RPC client by remote {@link Endpoints}, if it does not
already exist, it will be created
+ * automatically.
*
* @param endpoints remote endpoints.
* @return RPC client.
@@ -181,7 +179,7 @@ public class ClientManagerImpl extends ClientManager {
return rpcClient;
}
try {
- rpcClient = new RpcClientImpl(endpoints);
+ rpcClient = new RpcClientImpl(endpoints,
client.isSslEnabled());
} catch (SSLException e) {
log.error("Failed to get RPC client, endpoints={},
clientId={}", endpoints, client.getClientId(), e);
throw new ClientException("Failed to generate RPC client", e);
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterManager.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterManager.java
index 1dd0dfa5..113904c6 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterManager.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterManager.java
@@ -23,6 +23,7 @@ import io.grpc.ManagedChannel;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
+import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
import
io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
@@ -91,10 +92,18 @@ public class ClientMeterManager {
return;
}
final Endpoints endpoints = metric.getEndpoints();
- final SslContext sslContext =
GrpcSslContexts.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)
- .build();
final NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forTarget(endpoints.getGrpcTarget())
- .sslContext(sslContext).intercept(new
AuthInterceptor(clientConfiguration, clientId));
+ .intercept(new AuthInterceptor(clientConfiguration, clientId));
+
+ if (clientConfiguration.isSslEnabled()) {
+ final SslContextBuilder builder = GrpcSslContexts.forClient();
+ builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
+ SslContext sslContext = builder.build();
+ channelBuilder.sslContext(sslContext);
+ } else {
+ channelBuilder.usePlaintext();
+ }
+
final List<InetSocketAddress> socketAddresses =
endpoints.toSocketAddresses();
if (null != socketAddresses) {
IpNameResolverFactory metricResolverFactory = new
IpNameResolverFactory(socketAddresses);
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
index 2a17f52e..a40cd398 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
@@ -73,17 +73,22 @@ public class RpcClientImpl implements RpcClient {
private long activityNanoTime;
@SuppressWarnings("deprecation")
- public RpcClientImpl(Endpoints endpoints) throws SSLException {
- final SslContextBuilder builder = GrpcSslContexts.forClient();
- builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
- SslContext sslContext = builder.build();
-
+ public RpcClientImpl(Endpoints endpoints, boolean sslEnabled) throws
SSLException {
final NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forTarget(endpoints.getGrpcTarget())
.withOption(ChannelOption.CONNECT_TIMEOUT_MILLIS,
CONNECT_TIMEOUT_MILLIS)
.maxInboundMessageSize(GRPC_MAX_MESSAGE_SIZE)
- .intercept(LoggingInterceptor.getInstance())
- .sslContext(sslContext);
+ .intercept(LoggingInterceptor.getInstance());
+
+ if (sslEnabled) {
+ final SslContextBuilder builder = GrpcSslContexts.forClient();
+ builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
+ SslContext sslContext = builder.build();
+ channelBuilder.sslContext(sslContext);
+ } else {
+ channelBuilder.usePlaintext();
+ }
+
// Disable grpc's auto-retry here.
channelBuilder.disableRetry();
final List<InetSocketAddress> socketAddresses =
endpoints.toSocketAddresses();