ijuma commented on a change in pull request #8695: URL: https://github.com/apache/kafka/pull/8695#discussion_r431587702
########## File path: clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java ########## @@ -622,6 +619,81 @@ public void testUnsupportedTLSVersion() throws Exception { server.verifyAuthenticationMetrics(0, 1); } + /** + * Tests that connections fails if TLSv1.3 enabled but cipher suite suitable only for TLSv1.2 used. + */ + @Test + public void testCiphersSuiteForTLSv1_2_FailsForTLSv1_3() throws Exception { + if (!Java.IS_JAVA11_COMPATIBLE) + return; Review comment: We can use `assume` to do this in a more idiomatic way. Same for other places where we do something similar. ########## File path: tests/kafkatest/services/kafka/kafka.py ########## @@ -352,17 +357,18 @@ def start_cmd(self, node): KafkaService.STDOUT_STDERR_CAPTURE) return cmd - def start_node(self, node, timeout_sec=60): + def start_node(self, node, timeout_sec=180): Review comment: Why do we need this change? ########## File path: tests/kafkatest/services/kafka/kafka.py ########## @@ -52,6 +51,7 @@ def advertised_listener(self, node): def listener_security_protocol(self): return "%s:%s" % (self.name, self.security_protocol) + Review comment: Is this intentional? ########## File path: clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java ########## @@ -622,6 +619,81 @@ public void testUnsupportedTLSVersion() throws Exception { server.verifyAuthenticationMetrics(0, 1); } + /** + * Tests that connections fails if TLSv1.3 enabled but cipher suite suitable only for TLSv1.2 used. + */ + @Test + public void testCiphersSuiteForTLSv1_2_FailsForTLSv1_3() throws Exception { + if (!Java.IS_JAVA11_COMPATIBLE) + return; + + SSLContext context = SSLContext.getInstance(tlsProtocol); + context.init(null, null, null); + + //Note, that only some ciphers works out of the box. Others requires additional configuration. + String cipherSuite = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"; + + sslServerConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.3"); + sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.3")); + sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuite)); + server = createEchoServer(SecurityProtocol.SSL); + + sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.3")); + sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuite)); + + checkAuthentiationFailed("0", "TLSv1.3"); + server.verifyAuthenticationMetrics(0, 1); + } + + /** + * Tests that connections can be made with TLSv1.2 and custom cipher suite. + */ + @Test + public void testCiphersSuiteFailForServerTLSv1_2_ClientTLSv1_3() throws Exception { + if (!Java.IS_JAVA11_COMPATIBLE) + return; + + String cipherSuite = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"; + + sslServerConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); + sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.2")); + sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuite)); + server = createEchoServer(SecurityProtocol.SSL); + + sslClientConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.3"); + sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuite)); + + checkAuthentiationFailed("0", "TLSv1.3"); + } + + /** + * Tests that connections can be made with TLSv1.2 and custom cipher suite. + */ + @Test + public void testCiphersSuiteForTLSv1_2() throws Exception { + String node = "0"; + SSLContext context = SSLContext.getInstance(tlsProtocol); + context.init(null, null, null); + + //Note, that only some ciphers works out of the box. Others requires additional configuration. + String cipherSuite = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"; + + sslServerConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); Review comment: Can we set this to `DEFAULT_SSL_PROTOCOL`? ########## File path: clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java ########## @@ -622,6 +619,81 @@ public void testUnsupportedTLSVersion() throws Exception { server.verifyAuthenticationMetrics(0, 1); } + /** + * Tests that connections fails if TLSv1.3 enabled but cipher suite suitable only for TLSv1.2 used. + */ + @Test + public void testCiphersSuiteForTLSv1_2_FailsForTLSv1_3() throws Exception { + if (!Java.IS_JAVA11_COMPATIBLE) + return; + + SSLContext context = SSLContext.getInstance(tlsProtocol); + context.init(null, null, null); + + //Note, that only some ciphers works out of the box. Others requires additional configuration. + String cipherSuite = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"; + + sslServerConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.3"); + sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.3")); + sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuite)); + server = createEchoServer(SecurityProtocol.SSL); + + sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.3")); + sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuite)); + + checkAuthentiationFailed("0", "TLSv1.3"); + server.verifyAuthenticationMetrics(0, 1); + } + + /** + * Tests that connections can be made with TLSv1.2 and custom cipher suite. + */ + @Test + public void testCiphersSuiteFailForServerTLSv1_2_ClientTLSv1_3() throws Exception { + if (!Java.IS_JAVA11_COMPATIBLE) + return; + + String cipherSuite = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"; + + sslServerConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); + sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.2")); + sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuite)); + server = createEchoServer(SecurityProtocol.SSL); + + sslClientConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.3"); + sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuite)); + + checkAuthentiationFailed("0", "TLSv1.3"); + } + + /** + * Tests that connections can be made with TLSv1.2 and custom cipher suite. Review comment: The important detail is that TLS 1.3 is enabled too, right? ########## File path: clients/src/test/java/org/apache/kafka/common/network/SslVersionsTransportLayerTest.java ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.network; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.security.TestSecurityConfig; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Java; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.test.TestUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * Tests for the SSL transport layer. + * Checks different versions of the protocol usage on the server and client. + */ +@RunWith(value = Parameterized.class) +public class SslVersionsTransportLayerTest { + private static final int BUFFER_SIZE = 4 * 1024; + private static final Time TIME = Time.SYSTEM; + + private final String tlsServerProtocol; + private final String tlsClientProtocol; + + @Parameterized.Parameters(name = "tlsServerProtocol={0},tlsClientProtocol={1}") + public static Collection<Object[]> data() { + List<Object[]> values = new ArrayList<>(); + values.add(new Object[] {"TLSv1.2", "TLSv1.2"}); + if (Java.IS_JAVA11_COMPATIBLE) { + values.add(new Object[] {"TLSv1.2", "TLSv1.3"}); + values.add(new Object[] {"TLSv1.3", "TLSv1.2"}); + values.add(new Object[] {"TLSv1.3", "TLSv1.3"}); + } + return values; + } + + public SslVersionsTransportLayerTest(String tlsServerProtocol, String tlsClientProtocol) { + this.tlsServerProtocol = tlsServerProtocol; + this.tlsClientProtocol = tlsClientProtocol; + } + + /** + * Tests that connection success with the default TLS version. + */ + @Test + public void testTLSDefaults() throws Exception { Review comment: Nit: the test should be `testTlsDefaults` to match the naming convention. ########## File path: clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java ########## @@ -591,10 +591,7 @@ public void testUnsupportedCipher() throws Exception { createSelector(sslClientConfigs); checkAuthentiationFailed("1", "TLSv1.1"); - server.verifyAuthenticationMetrics(0, 1); - checkAuthentiationFailed("2", "TLSv1"); - server.verifyAuthenticationMetrics(0, 2); Review comment: Why did we remove both lines above? ########## File path: clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java ########## @@ -1250,7 +1322,7 @@ private NioEchoServer createEchoServer(SecurityProtocol securityProtocol) throws void run() throws IOException; } - private static class TestSslChannelBuilder extends SslChannelBuilder { + public static class TestSslChannelBuilder extends SslChannelBuilder { Review comment: Do these have to be public or can they be package private? ########## File path: clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java ########## @@ -622,6 +619,81 @@ public void testUnsupportedTLSVersion() throws Exception { server.verifyAuthenticationMetrics(0, 1); } + /** + * Tests that connections fails if TLSv1.3 enabled but cipher suite suitable only for TLSv1.2 used. + */ + @Test + public void testCiphersSuiteForTLSv1_2_FailsForTLSv1_3() throws Exception { + if (!Java.IS_JAVA11_COMPATIBLE) + return; + + SSLContext context = SSLContext.getInstance(tlsProtocol); + context.init(null, null, null); + + //Note, that only some ciphers works out of the box. Others requires additional configuration. + String cipherSuite = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"; + + sslServerConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.3"); + sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.3")); + sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuite)); + server = createEchoServer(SecurityProtocol.SSL); + + sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.3")); + sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuite)); + + checkAuthentiationFailed("0", "TLSv1.3"); + server.verifyAuthenticationMetrics(0, 1); + } + + /** + * Tests that connections can be made with TLSv1.2 and custom cipher suite. + */ + @Test + public void testCiphersSuiteFailForServerTLSv1_2_ClientTLSv1_3() throws Exception { + if (!Java.IS_JAVA11_COMPATIBLE) + return; + + String cipherSuite = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"; + + sslServerConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); + sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.2")); + sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuite)); + server = createEchoServer(SecurityProtocol.SSL); + + sslClientConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.3"); + sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuite)); + + checkAuthentiationFailed("0", "TLSv1.3"); + } + + /** + * Tests that connections can be made with TLSv1.2 and custom cipher suite. + */ + @Test + public void testCiphersSuiteForTLSv1_2() throws Exception { + String node = "0"; + SSLContext context = SSLContext.getInstance(tlsProtocol); + context.init(null, null, null); + + //Note, that only some ciphers works out of the box. Others requires additional configuration. + String cipherSuite = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"; + + sslServerConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); + sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList(SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS.split(","))); + sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuite)); + server = createEchoServer(SecurityProtocol.SSL); + + sslClientConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); Review comment: Same here, can we `SslConfigs.DEFAULT_SSL_PROTOCOL`? ########## File path: clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java ########## @@ -622,6 +619,81 @@ public void testUnsupportedTLSVersion() throws Exception { server.verifyAuthenticationMetrics(0, 1); } + /** + * Tests that connections fails if TLSv1.3 enabled but cipher suite suitable only for TLSv1.2 used. + */ + @Test + public void testCiphersSuiteForTLSv1_2_FailsForTLSv1_3() throws Exception { + if (!Java.IS_JAVA11_COMPATIBLE) + return; + + SSLContext context = SSLContext.getInstance(tlsProtocol); + context.init(null, null, null); + + //Note, that only some ciphers works out of the box. Others requires additional configuration. + String cipherSuite = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"; + + sslServerConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.3"); + sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.3")); + sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuite)); + server = createEchoServer(SecurityProtocol.SSL); + + sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.3")); + sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuite)); + + checkAuthentiationFailed("0", "TLSv1.3"); + server.verifyAuthenticationMetrics(0, 1); + } + + /** + * Tests that connections can be made with TLSv1.2 and custom cipher suite. Review comment: This comment seems incorrect. ########## File path: clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java ########## @@ -622,6 +619,81 @@ public void testUnsupportedTLSVersion() throws Exception { server.verifyAuthenticationMetrics(0, 1); } + /** + * Tests that connections fails if TLSv1.3 enabled but cipher suite suitable only for TLSv1.2 used. + */ + @Test + public void testCiphersSuiteForTLSv1_2_FailsForTLSv1_3() throws Exception { + if (!Java.IS_JAVA11_COMPATIBLE) + return; + + SSLContext context = SSLContext.getInstance(tlsProtocol); + context.init(null, null, null); + + //Note, that only some ciphers works out of the box. Others requires additional configuration. + String cipherSuite = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"; + + sslServerConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.3"); + sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.3")); + sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuite)); + server = createEchoServer(SecurityProtocol.SSL); + + sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.3")); + sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuite)); + + checkAuthentiationFailed("0", "TLSv1.3"); + server.verifyAuthenticationMetrics(0, 1); + } + + /** + * Tests that connections can be made with TLSv1.2 and custom cipher suite. + */ + @Test + public void testCiphersSuiteFailForServerTLSv1_2_ClientTLSv1_3() throws Exception { + if (!Java.IS_JAVA11_COMPATIBLE) + return; + + String cipherSuite = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"; + + sslServerConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); + sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.2")); + sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuite)); + server = createEchoServer(SecurityProtocol.SSL); + + sslClientConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.3"); + sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuite)); + + checkAuthentiationFailed("0", "TLSv1.3"); + } + + /** + * Tests that connections can be made with TLSv1.2 and custom cipher suite. + */ + @Test + public void testCiphersSuiteForTLSv1_2() throws Exception { + String node = "0"; + SSLContext context = SSLContext.getInstance(tlsProtocol); + context.init(null, null, null); + + //Note, that only some ciphers works out of the box. Others requires additional configuration. + String cipherSuite = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"; + + sslServerConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); + sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList(SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS.split(","))); Review comment: Can we replace `Arrays.asList(SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS.split(","))` with `SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS` Doesn't the code handle comma separated Strings? ########## File path: clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java ########## @@ -622,6 +619,81 @@ public void testUnsupportedTLSVersion() throws Exception { server.verifyAuthenticationMetrics(0, 1); } + /** + * Tests that connections fails if TLSv1.3 enabled but cipher suite suitable only for TLSv1.2 used. + */ + @Test + public void testCiphersSuiteForTLSv1_2_FailsForTLSv1_3() throws Exception { + if (!Java.IS_JAVA11_COMPATIBLE) + return; + + SSLContext context = SSLContext.getInstance(tlsProtocol); + context.init(null, null, null); + + //Note, that only some ciphers works out of the box. Others requires additional configuration. + String cipherSuite = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"; + + sslServerConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.3"); + sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.3")); + sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuite)); + server = createEchoServer(SecurityProtocol.SSL); + + sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.3")); + sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuite)); + + checkAuthentiationFailed("0", "TLSv1.3"); + server.verifyAuthenticationMetrics(0, 1); + } + + /** + * Tests that connections can be made with TLSv1.2 and custom cipher suite. + */ + @Test + public void testCiphersSuiteFailForServerTLSv1_2_ClientTLSv1_3() throws Exception { + if (!Java.IS_JAVA11_COMPATIBLE) + return; + + String cipherSuite = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"; + + sslServerConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); + sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.2")); + sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuite)); + server = createEchoServer(SecurityProtocol.SSL); + + sslClientConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.3"); + sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuite)); + + checkAuthentiationFailed("0", "TLSv1.3"); + } + + /** + * Tests that connections can be made with TLSv1.2 and custom cipher suite. + */ + @Test + public void testCiphersSuiteForTLSv1_2() throws Exception { Review comment: If we take the suggestions to use `DEFAULT_SSL_PROTOCOL`, then this test is checking that we can negotiate TLSv1.2 if the custom cipher suites are only compatible with TLSv1.2 and we use the default protocol configuration (TLSv1.3). ########## File path: clients/src/test/java/org/apache/kafka/common/network/SslVersionsTransportLayerTest.java ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.network; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.security.TestSecurityConfig; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Java; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.test.TestUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * Tests for the SSL transport layer. + * Checks different versions of the protocol usage on the server and client. + */ +@RunWith(value = Parameterized.class) +public class SslVersionsTransportLayerTest { + private static final int BUFFER_SIZE = 4 * 1024; + private static final Time TIME = Time.SYSTEM; + + private final String tlsServerProtocol; + private final String tlsClientProtocol; + + @Parameterized.Parameters(name = "tlsServerProtocol={0},tlsClientProtocol={1}") + public static Collection<Object[]> data() { + List<Object[]> values = new ArrayList<>(); + values.add(new Object[] {"TLSv1.2", "TLSv1.2"}); + if (Java.IS_JAVA11_COMPATIBLE) { + values.add(new Object[] {"TLSv1.2", "TLSv1.3"}); + values.add(new Object[] {"TLSv1.3", "TLSv1.2"}); + values.add(new Object[] {"TLSv1.3", "TLSv1.3"}); + } + return values; + } + + public SslVersionsTransportLayerTest(String tlsServerProtocol, String tlsClientProtocol) { Review comment: Could we have each parameter be a list so that we can set enabled protocols with more than one element? ########## File path: clients/src/test/java/org/apache/kafka/common/network/SslVersionsTransportLayerTest.java ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.network; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.security.TestSecurityConfig; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Java; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.test.TestUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * Tests for the SSL transport layer. + * Checks different versions of the protocol usage on the server and client. + */ +@RunWith(value = Parameterized.class) +public class SslVersionsTransportLayerTest { + private static final int BUFFER_SIZE = 4 * 1024; + private static final Time TIME = Time.SYSTEM; + + private final String tlsServerProtocol; + private final String tlsClientProtocol; + + @Parameterized.Parameters(name = "tlsServerProtocol={0},tlsClientProtocol={1}") + public static Collection<Object[]> data() { + List<Object[]> values = new ArrayList<>(); + values.add(new Object[] {"TLSv1.2", "TLSv1.2"}); + if (Java.IS_JAVA11_COMPATIBLE) { + values.add(new Object[] {"TLSv1.2", "TLSv1.3"}); + values.add(new Object[] {"TLSv1.3", "TLSv1.2"}); + values.add(new Object[] {"TLSv1.3", "TLSv1.3"}); + } + return values; + } + + public SslVersionsTransportLayerTest(String tlsServerProtocol, String tlsClientProtocol) { + this.tlsServerProtocol = tlsServerProtocol; + this.tlsClientProtocol = tlsClientProtocol; + } + + /** + * Tests that connection success with the default TLS version. + */ + @Test + public void testTLSDefaults() throws Exception { + // Create certificates for use by client and server. Add server cert to client truststore and vice versa. + CertStores serverCertStores = new CertStores(true, "server", "localhost"); + CertStores clientCertStores = new CertStores(false, "client", "localhost"); + + Map<String, Object> sslClientConfigs = getTrustingConfig(clientCertStores, serverCertStores, tlsClientProtocol); + Map<String, Object> sslServerConfigs = getTrustingConfig(serverCertStores, clientCertStores, tlsServerProtocol); + + NioEchoServer server = NetworkTestUtils.createEchoServer(ListenerName.forSecurityProtocol(SecurityProtocol.SSL), + SecurityProtocol.SSL, + new TestSecurityConfig(sslServerConfigs), + null, + TIME); + Selector selector = createSelector(sslClientConfigs); + + String node = "0"; + selector.connect(node, new InetSocketAddress("localhost", server.port()), BUFFER_SIZE, BUFFER_SIZE); + + if (tlsServerProtocol.equals(tlsClientProtocol)) { + NetworkTestUtils.waitForChannelReady(selector, node); + + int msgSz = 1024 * 1024; + String message = TestUtils.randomString(msgSz); + selector.send(new NetworkSend(node, ByteBuffer.wrap(message.getBytes()))); + while (selector.completedReceives().isEmpty()) { + selector.poll(100L); + } + int totalBytes = msgSz + 4; // including 4-byte size + server.waitForMetric("incoming-byte", totalBytes); + server.waitForMetric("outgoing-byte", totalBytes); + server.waitForMetric("request", 1); + server.waitForMetric("response", 1); + } else { + NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED); + } + } + + public static Map<String, Object> sslConfig(String tlsServerProtocol) { + Map<String, Object> sslConfig = new HashMap<>(); + + sslConfig.put(SslConfigs.SSL_PROTOCOL_CONFIG, tlsServerProtocol); + sslConfig.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Collections.singletonList(tlsServerProtocol)); + Review comment: We probably don't need these empty lines. ########## File path: clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java ########## @@ -622,6 +619,81 @@ public void testUnsupportedTLSVersion() throws Exception { server.verifyAuthenticationMetrics(0, 1); } + /** + * Tests that connections fails if TLSv1.3 enabled but cipher suite suitable only for TLSv1.2 used. + */ + @Test + public void testCiphersSuiteForTLSv1_2_FailsForTLSv1_3() throws Exception { + if (!Java.IS_JAVA11_COMPATIBLE) + return; + + SSLContext context = SSLContext.getInstance(tlsProtocol); + context.init(null, null, null); + + //Note, that only some ciphers works out of the box. Others requires additional configuration. + String cipherSuite = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"; + + sslServerConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.3"); + sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.3")); + sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuite)); + server = createEchoServer(SecurityProtocol.SSL); + + sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.3")); + sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuite)); + + checkAuthentiationFailed("0", "TLSv1.3"); + server.verifyAuthenticationMetrics(0, 1); + } + + /** + * Tests that connections can be made with TLSv1.2 and custom cipher suite. + */ + @Test + public void testCiphersSuiteFailForServerTLSv1_2_ClientTLSv1_3() throws Exception { + if (!Java.IS_JAVA11_COMPATIBLE) + return; + + String cipherSuite = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"; + + sslServerConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); + sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.2")); + sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuite)); + server = createEchoServer(SecurityProtocol.SSL); + + sslClientConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.3"); + sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuite)); + + checkAuthentiationFailed("0", "TLSv1.3"); + } + + /** + * Tests that connections can be made with TLSv1.2 and custom cipher suite. + */ + @Test + public void testCiphersSuiteForTLSv1_2() throws Exception { Review comment: We should also add a similar test where the custom cipher is a TLS 1.3 cipher. ########## File path: tests/docker/run_tests.sh ########## @@ -30,6 +30,6 @@ if [ "$REBUILD" == "t" ]; then fi if ${SCRIPT_DIR}/ducker-ak ssh | grep -q '(none)'; then - ${SCRIPT_DIR}/ducker-ak up -n "${KAFKA_NUM_CONTAINERS}" || die "ducker-ak up failed" + ${SCRIPT_DIR}/ducker-ak up -j 'openjdk:11' -n "${KAFKA_NUM_CONTAINERS}" || die "ducker-ak up failed" Review comment: Do we intend to change this temporarily or permanently? ########## File path: tests/kafkatest/benchmarks/core/benchmark_test.py ########## @@ -236,9 +238,9 @@ def test_producer_and_consumer(self, compression_type="none", security_protocol= return data @cluster(num_nodes=6) - @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') - @matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"]) - def test_consumer_throughput(self, compression_type="none", security_protocol="PLAINTEXT", + @matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'], compression_type=["none", "snappy"]) + @matrix(security_protocol=['PLAINTEXT'], compression_type=["none", "snappy"]) Review comment: Maybe we can configure the TLS version for `benchmark_test.py` and `replication_test.py` only. The rest can use the default, which will be TLS 1.3 for Java 11 and TLS 1.2 for Java 8. That would not inflate test times by too much. What do you think? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org