This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new cd8fd7bba [ISSUE #6047] Support TLS permissive mode for 5.x client
(#6048)
cd8fd7bba is described below
commit cd8fd7bbafb80ea4d7ff1ad9e3a04fa8fbd2b09f
Author: lk <[email protected]>
AuthorDate: Wed Feb 15 10:35:18 2023 +0800
[ISSUE #6047] Support TLS permissive mode for 5.x client (#6048)
---
.../rocketmq/proxy/grpc/GrpcServerBuilder.java | 52 +++++++-----
.../proxy/grpc/OptionalSSLProtocolNegotiator.java | 94 ++++++++++++++++++++++
.../http2proxy/Http2ProtocolProxyHandler.java | 27 ++++---
3 files changed, 143 insertions(+), 30 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
index 5e1b73505..d496bfd10 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
@@ -26,6 +26,7 @@ import
io.grpc.netty.shaded.io.netty.channel.epoll.EpollServerSocketChannel;
import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup;
import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel;
import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
+import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import
io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.grpc.netty.shaded.io.netty.handler.ssl.util.SelfSignedCertificate;
import java.io.IOException;
@@ -36,7 +37,6 @@ import java.security.cert.CertificateException;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import javax.net.ssl.SSLException;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.ServiceProvider;
@@ -48,6 +48,8 @@ import
org.apache.rocketmq.proxy.grpc.interceptor.AuthenticationInterceptor;
import org.apache.rocketmq.proxy.grpc.interceptor.ContextInterceptor;
import org.apache.rocketmq.proxy.grpc.interceptor.GlobalExceptionInterceptor;
import org.apache.rocketmq.proxy.grpc.interceptor.HeaderInterceptor;
+import org.apache.rocketmq.remoting.common.TlsMode;
+import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
public class GrpcServerBuilder {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
@@ -113,32 +115,42 @@ public class GrpcServerBuilder {
return new GrpcServer(this.serverBuilder.build());
}
- protected void configSslContext(NettyServerBuilder serverBuilder) throws
SSLException, CertificateException {
+ protected void configSslContext(NettyServerBuilder serverBuilder) throws
IOException, CertificateException {
if (null == serverBuilder) {
return;
}
- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
- boolean tlsTestModeEnable = proxyConfig.isTlsTestModeEnable();
- if (tlsTestModeEnable) {
- SelfSignedCertificate selfSignedCertificate = new
SelfSignedCertificate();
-
serverBuilder.sslContext(GrpcSslContexts.forServer(selfSignedCertificate.certificate(),
selfSignedCertificate.privateKey())
- .trustManager(InsecureTrustManagerFactory.INSTANCE)
- .clientAuth(ClientAuth.NONE)
- .build());
- return;
+
+ TlsMode tlsMode = TlsSystemConfig.tlsMode;
+ if (!TlsMode.DISABLED.equals(tlsMode)) {
+ SslContext sslContext = loadSslContext();
+ if (TlsMode.PERMISSIVE.equals(tlsMode)) {
+ serverBuilder.protocolNegotiator(new
OptionalSSLProtocolNegotiator(sslContext));
+ } else {
+ serverBuilder.sslContext(sslContext);
+ }
}
+ }
- String tlsKeyPath =
ConfigurationManager.getProxyConfig().getTlsKeyPath();
- String tlsCertPath =
ConfigurationManager.getProxyConfig().getTlsCertPath();
- try (InputStream serverKeyInputStream =
Files.newInputStream(Paths.get(tlsKeyPath));
- InputStream serverCertificateStream =
Files.newInputStream(Paths.get(tlsCertPath))) {
-
serverBuilder.sslContext(GrpcSslContexts.forServer(serverCertificateStream,
serverKeyInputStream)
+ protected SslContext loadSslContext() throws CertificateException,
IOException {
+ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+ if (proxyConfig.isTlsTestModeEnable()) {
+ SelfSignedCertificate selfSignedCertificate = new
SelfSignedCertificate();
+ return
GrpcSslContexts.forServer(selfSignedCertificate.certificate(),
selfSignedCertificate.privateKey())
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.clientAuth(ClientAuth.NONE)
- .build());
- log.info("TLS configured OK");
- } catch (IOException e) {
- log.error("Failed to load Server key/certificate", e);
+ .build();
+ } else {
+ String tlsKeyPath =
ConfigurationManager.getProxyConfig().getTlsKeyPath();
+ String tlsCertPath =
ConfigurationManager.getProxyConfig().getTlsCertPath();
+ try (InputStream serverKeyInputStream =
Files.newInputStream(Paths.get(tlsKeyPath));
+ InputStream serverCertificateStream =
Files.newInputStream(Paths.get(tlsCertPath))) {
+ SslContext res =
GrpcSslContexts.forServer(serverCertificateStream, serverKeyInputStream)
+ .trustManager(InsecureTrustManagerFactory.INSTANCE)
+ .clientAuth(ClientAuth.NONE)
+ .build();
+ log.info("load TLS configured OK");
+ return res;
+ }
}
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java
new file mode 100644
index 000000000..bf19abf85
--- /dev/null
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/OptionalSSLProtocolNegotiator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.proxy.grpc;
+
+import io.grpc.netty.shaded.io.grpc.netty.GrpcHttp2ConnectionHandler;
+import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiationEvent;
+import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiator;
+import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiators;
+import io.grpc.netty.shaded.io.netty.buffer.ByteBuf;
+import io.grpc.netty.shaded.io.netty.channel.ChannelHandler;
+import io.grpc.netty.shaded.io.netty.channel.ChannelHandlerContext;
+import io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder;
+import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
+import io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler;
+import io.grpc.netty.shaded.io.netty.util.AsciiString;
+import java.util.List;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+
+public class OptionalSSLProtocolNegotiator implements
InternalProtocolNegotiator.ProtocolNegotiator {
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+ private final SslContext sslContext;
+ /**
+ * the length of the ssl record header (in bytes)
+ */
+ private static final int SSL_RECORD_HEADER_LENGTH = 5;
+
+ public OptionalSSLProtocolNegotiator(SslContext sslContext) {
+ this.sslContext = sslContext;
+ }
+
+ @Override
+ public AsciiString scheme() {
+ return AsciiString.of("https");
+ }
+
+ @Override
+ public ChannelHandler newHandler(GrpcHttp2ConnectionHandler
grpcHttp2ConnectionHandler) {
+ ChannelHandler plaintext =
+
InternalProtocolNegotiators.serverPlaintext().newHandler(grpcHttp2ConnectionHandler);
+ ChannelHandler ssl =
+
InternalProtocolNegotiators.serverTls(sslContext).newHandler(grpcHttp2ConnectionHandler);
+ return new PortUnificationServerHandler(ssl, plaintext);
+ }
+
+ @Override
+ public void close() {}
+
+ public static class PortUnificationServerHandler extends
ByteToMessageDecoder {
+ private final ChannelHandler ssl;
+ private final ChannelHandler plaintext;
+
+ public PortUnificationServerHandler(ChannelHandler ssl, ChannelHandler
plaintext) {
+ this.ssl = ssl;
+ this.plaintext = plaintext;
+ }
+
+ @Override
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out)
+ throws Exception {
+ try {
+ // in SslHandler.isEncrypted, it need at least 5 bytes to
judge is encrypted or not
+ if (in.readableBytes() < SSL_RECORD_HEADER_LENGTH) {
+ return;
+ }
+ if (SslHandler.isEncrypted(in)) {
+ ctx.pipeline().addAfter(ctx.name(), null, this.ssl);
+ } else {
+ ctx.pipeline().addAfter(ctx.name(), null, this.plaintext);
+ }
+
ctx.fireUserEventTriggered(InternalProtocolNegotiationEvent.getDefault());
+ ctx.pipeline().remove(this);
+ } catch (Exception e) {
+ log.error("process protocol negotiator failed.", e);
+ throw e;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
index 2ba2d3463..033877e16 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
@@ -37,6 +37,8 @@ import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.remoting.protocol.ProtocolHandler;
import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.TlsMode;
+import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
public class Http2ProtocolProxyHandler implements ProtocolHandler {
private static final Logger log =
LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
@@ -55,16 +57,21 @@ public class Http2ProtocolProxyHandler implements
ProtocolHandler {
public Http2ProtocolProxyHandler() {
try {
- sslContext = SslContextBuilder
- .forClient()
- .sslProvider(SslProvider.OPENSSL)
- .trustManager(InsecureTrustManagerFactory.INSTANCE)
- .applicationProtocolConfig(new ApplicationProtocolConfig(
- ApplicationProtocolConfig.Protocol.ALPN,
-
ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
-
ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
- ApplicationProtocolNames.HTTP_2))
- .build();
+ TlsMode tlsMode = TlsSystemConfig.tlsMode;
+ if (TlsMode.DISABLED.equals(tlsMode)) {
+ sslContext = null;
+ } else {
+ sslContext = SslContextBuilder
+ .forClient()
+ .sslProvider(SslProvider.OPENSSL)
+ .trustManager(InsecureTrustManagerFactory.INSTANCE)
+ .applicationProtocolConfig(new ApplicationProtocolConfig(
+ ApplicationProtocolConfig.Protocol.ALPN,
+
ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
+
ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
+ ApplicationProtocolNames.HTTP_2))
+ .build();
+ }
} catch (SSLException e) {
log.error("Failed to create SSLContext for
Http2ProtocolProxyHandler", e);
throw new RuntimeException("Failed to create SSLContext for
Http2ProtocolProxyHandler", e);