[ 
https://issues.apache.org/jira/browse/CASSANDRA-20984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vivekanand Koya updated CASSANDRA-20984:
----------------------------------------
    Test and Documentation Plan: 
Reproduce exception: 

```java
{color:#7a7e85}/*{color}{color:#7a7e85} * Licensed to the Apache Software 
Foundation (ASF) under one{color}{color:#7a7e85} * or more contributor license 
agreements. See the NOTICE file{color}{color:#7a7e85} * distributed with this 
work for additional information{color}{color:#7a7e85} * regarding copyright 
ownership. The ASF licenses this file{color}{color:#7a7e85} * to you under the 
Apache License, Version 2.0 (the{color}{color:#7a7e85} * "License"); you may 
not use this file except in compliance{color}{color:#7a7e85} * with the 
License. You may obtain a copy of the License at{color}{color:#7a7e85} 
*{color}{color:#7a7e85} * 
[http://www.apache.org/licenses/LICENSE-2.0]{color}{color:#7a7e85} 
*{color}{color:#7a7e85} * Unless required by applicable law or agreed to in 
writing, software{color}{color:#7a7e85} * distributed under the License is 
distributed on an "AS IS" BASIS,{color}{color:#7a7e85} * WITHOUT WARRANTIES OR 
CONDITIONS OF ANY KIND, either express or implied.{color}{color:#7a7e85} * See 
the License for the specific language governing permissions 
and{color}{color:#7a7e85} * limitations under the 
License.{color}{color:#7a7e85} */{color}{color:#cf8e6d}package 
{color}org.apache.cassandra.net;

{color:#cf8e6d}import {color}java.nio.channels.ClosedChannelException;
{color:#cf8e6d}import {color}java.util.ArrayList;
{color:#cf8e6d}import {color}java.util.HashMap;
{color:#cf8e6d}import {color}java.util.concurrent.ConcurrentHashMap;
{color:#cf8e6d}import {color}java.util.concurrent.ExecutionException;
{color:#cf8e6d}import {color}java.util.concurrent.TimeUnit;
{color:#cf8e6d}import {color}java.util.concurrent.atomic.AtomicInteger;
{color:#cf8e6d}import {color}java.util.function.Consumer;

{color:#cf8e6d}import {color}com.google.common.net.InetAddresses;
{color:#cf8e6d}import {color}org.junit.AfterClass;
{color:#cf8e6d}import {color}org.junit.Assert;
{color:#cf8e6d}import {color}org.junit.Before;
{color:#cf8e6d}import {color}org.junit.BeforeClass;
{color:#cf8e6d}import {color}org.junit.Test;

{color:#cf8e6d}import {color}io.netty.channel.EventLoop;
{color:#cf8e6d}import {color}io.netty.util.concurrent.Future;
{color:#cf8e6d}import {color}org.apache.cassandra.config.DatabaseDescriptor;
{color:#cf8e6d}import 
{color}org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
{color:#cf8e6d}import 
{color}org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions.Builder;
{color:#cf8e6d}import {color}org.apache.cassandra.config.ParameterizedClass;
{color:#cf8e6d}import {color}org.apache.cassandra.db.commitlog.CommitLog;
{color:#cf8e6d}import {color}org.apache.cassandra.gms.GossipDigestSyn;
{color:#cf8e6d}import {color}org.apache.cassandra.locator.InetAddressAndPort;
{color:#cf8e6d}import 
{color}org.apache.cassandra.security.DefaultSslContextFactory;
{color:#cf8e6d}import {color}org.apache.cassandra.transport.TlsTestUtils;

{color:#cf8e6d}import static 
{color}org.apache.cassandra.net.OutboundConnectionInitiator.Result;
{color:#cf8e6d}import static 
{color}org.apache.cassandra.net.OutboundConnectionInitiator.SslFallbackConnectionType;
{color:#cf8e6d}import static 
{color}org.apache.cassandra.net.OutboundConnectionInitiator.initiateStreaming;
{color:#cf8e6d}import static 
{color}org.apache.cassandra.net.MessagingService.current_version;
{color:#cf8e6d}import static 
{color}org.apache.cassandra.net.MessagingService.minimum_version;
{color:#cf8e6d}import static 
{color}org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions.ClientAuth.NOT_REQUIRED;
{color:#cf8e6d}import static 
{color}org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions.ClientAuth.REQUIRED;
{color:#cf8e6d}import static 
{color}org.apache.cassandra.tcm.ClusterMetadata.EMPTY_METADATA_IDENTIFIER;
{color:#cf8e6d}public class {color}StreamingTest
{
{color:#cf8e6d}private static final {color}SocketFactory factory = 
{color:#cf8e6d}new {color}SocketFactory();
{color:#cf8e6d}static final {color}InetAddressAndPort TO_ADDR = 
InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString({color:#6aab73}"127.0.0.2"{color}),
 {color:#2aacb8}7012{color});
{color:#cf8e6d}static final {color}InetAddressAndPort FROM_ADDR = 
InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString({color:#6aab73}"127.0.0.1"{color}),
 {color:#2aacb8}7012{color});
{color:#cf8e6d}private volatile {color}Throwable handshakeEx;
@BeforeClass
{color:#cf8e6d}public static void {color}startup()

{ DatabaseDescriptor.daemonInitialization(); CommitLog.instance.start(); }

@AfterClass
{color:#cf8e6d}public static void {color}cleanup() {color:#cf8e6d}throws 
{color}InterruptedException

{ factory.shutdownNow(); }

@Before
{color:#cf8e6d}public void {color}setup()
{
handshakeEx = {color:#cf8e6d}null{color};
}

{color:#cf8e6d}private {color}Future<Result<Result.StreamingSuccess>> 
streamingConnect(AcceptVersions acceptOutbound, AcceptVersions acceptInbound) 
{color:#cf8e6d}throws {color}ExecutionException, InterruptedException
{
InboundSockets inbound = {color:#cf8e6d}new 
{color}InboundSockets({color:#cf8e6d}new 
{color}InboundConnectionSettings().withAcceptMessaging(acceptInbound));
{color:#cf8e6d}try{color}{color:#cf8e6d} {color}{
inbound.open();
InetAddressAndPort endpoint = inbound.sockets().stream().map(s -> 
s.settings.bindAddress).findFirst().get();
EventLoop eventLoop = factory.defaultGroup().next();
Future<Result<Result.StreamingSuccess>> result = initiateStreaming(eventLoop,
{color:#cf8e6d}new {color}OutboundConnectionSettings(endpoint)
.withAcceptVersions(acceptOutbound)
.withDefaults(ConnectionCategory.STREAMING),
SslFallbackConnectionType.SERVER_CONFIG
);
result.awaitUninterruptibly();
{color:#cf8e6d}return {color}result;
}
{color:#cf8e6d}finally{color}{color:#cf8e6d} {color}{
inbound.close().await({color:#2aacb8}1L{color}, TimeUnit.SECONDS);
}
}

@Test
{color:#cf8e6d}public void {color}testIncompatibleVersion() 
{color:#cf8e6d}throws {color}InterruptedException, ExecutionException
{
Future<Result<Result.StreamingSuccess>> result = 
streamingConnect({color:#cf8e6d}new {color}AcceptVersions(current_version + 
{color:#2aacb8}1{color}, current_version + {color:#2aacb8}1{color}), 
{color:#cf8e6d}new {color}AcceptVersions(minimum_version + 
{color:#2aacb8}2{color}, current_version + {color:#2aacb8}3{color}));
{color:#cf8e6d}if {color}(result.isSuccess())

{ Result<Result.StreamingSuccess> nowResult = result.getNow(); 
Assert.assertNull(nowResult.success()); 
Assert.assertEquals(Result.Outcome.INCOMPATIBLE, nowResult.outcome); 
Assert.assertEquals(current_version, 
nowResult.incompatible().closestSupportedVersion); 
Assert.assertEquals(current_version, 
nowResult.incompatible().maxMessagingVersion); }

{color:#cf8e6d}else {color}{
Assert.assertTrue({color:#cf8e6d}false{color});
}
}

@Test
{color:#cf8e6d}public void {color}testCompatibleVersion() {color:#cf8e6d}throws 
{color}InterruptedException, ExecutionException
{
Future<Result<Result.StreamingSuccess>> result = 
streamingConnect({color:#cf8e6d}new 
{color}AcceptVersions(MessagingService.minimum_version, current_version + 
{color:#2aacb8}1{color}), {color:#cf8e6d}new 
{color}AcceptVersions(minimum_version + {color:#2aacb8}2{color}, 
current_version + {color:#2aacb8}3{color}));
{color:#cf8e6d}if {color}(result.isSuccess())

{ Result<Result.StreamingSuccess> nowResult = result.getNow(); 
Assert.assertNotNull(nowResult.success().channel); 
Assert.assertEquals(Result.Outcome.SUCCESS, nowResult.outcome); 
Assert.assertEquals(current_version, nowResult.success().messagingVersion); }

{color:#cf8e6d}else {color}{
Assert.assertTrue({color:#cf8e6d}false{color});
}
}

{color:#cf8e6d}private {color}ServerEncryptionOptions 
getServerEncryptionOptions(SslFallbackConnectionType sslConnectionType, 
{color:#cf8e6d}boolean {color}optional)
{
Builder serverEncryptionOptionsBuilder = {color:#cf8e6d}new {color}Builder();

serverEncryptionOptionsBuilder.withOutboundKeystore(TlsTestUtils.SERVER_OUTBOUND_KEYSTORE_PATH)
.withOutboundKeystorePassword(TlsTestUtils.SERVER_OUTBOUND_KEYSTORE_PASSWORD)
.withOptional(optional)
.withKeyStore(TlsTestUtils.SERVER_KEYSTORE_PATH)
.withKeyStorePassword(TlsTestUtils.SERVER_KEYSTORE_PASSWORD)
.withTrustStore(TlsTestUtils.SERVER_TRUSTSTORE_PATH).withTrustStorePassword(TlsTestUtils.SERVER_TRUSTSTORE_PASSWORD)
.withSslContextFactory(({color:#cf8e6d}new 
{color}ParameterizedClass(DefaultSslContextFactory.{color:#cf8e6d}class{color}.getName(),
{color:#cf8e6d}new {color}HashMap<>())));

{color:#cf8e6d}if {color}(sslConnectionType == SslFallbackConnectionType.MTLS)

{ 
serverEncryptionOptionsBuilder.withInternodeEncryption(ServerEncryptionOptions.InternodeEncryption.all)
 .withRequireClientAuth(REQUIRED); }

{color:#cf8e6d}else if {color}(sslConnectionType == 
SslFallbackConnectionType.SSL)

{ 
serverEncryptionOptionsBuilder.withInternodeEncryption(ServerEncryptionOptions.InternodeEncryption.all)
 .withRequireClientAuth(NOT_REQUIRED); }

{color:#cf8e6d}return {color}serverEncryptionOptionsBuilder.build();
}

{color:#cf8e6d}private {color}OutboundConnection 
initiateOutbound(InetAddressAndPort endpoint, SslFallbackConnectionType 
connectionType, {color:#cf8e6d}boolean {color}optional) {color:#cf8e6d}throws 
{color}ClosedChannelException
{
{color:#cf8e6d}final {color}OutboundConnectionSettings settings = 
{color:#cf8e6d}new {color}OutboundConnectionSettings(endpoint)
.withAcceptVersions({color:#cf8e6d}new {color}AcceptVersions(minimum_version, 
current_version))
.withDefaults(ConnectionCategory.MESSAGING)
.withEncryption(getServerEncryptionOptions(connectionType, optional))
.withDebugCallbacks({color:#cf8e6d}new {color}HandshakeAcknowledgeChecker(t -> 
handshakeEx = t))
.withFrom(FROM_ADDR);
OutboundConnections outboundConnections = 
OutboundConnections.tryRegister({color:#cf8e6d}new 
{color}ConcurrentHashMap<>(), TO_ADDR, settings);
GossipDigestSyn syn = {color:#cf8e6d}new 
{color}GossipDigestSyn({color:#6aab73}"cluster"{color}, 
{color:#6aab73}"partitioner"{color}, EMPTY_METADATA_IDENTIFIER, 
{color:#cf8e6d}new {color}ArrayList<>({color:#2aacb8}0{color}));
Message<GossipDigestSyn> message = Message.out(Verb.GOSSIP_DIGEST_SYN, syn);
OutboundConnection outboundConnection = 
outboundConnections.connectionFor(message);
outboundConnection.enqueue(message);
{color:#cf8e6d}return {color}outboundConnection;
}
{color:#cf8e6d}private static class {color}HandshakeAcknowledgeChecker 
{color:#cf8e6d}implements {color}OutboundDebugCallbacks
{
{color:#cf8e6d}private final {color}AtomicInteger acks = {color:#cf8e6d}new 
{color}AtomicInteger({color:#2aacb8}0{color});
{color:#cf8e6d}private final {color}Consumer<Throwable> fail;

{color:#cf8e6d}private {color}HandshakeAcknowledgeChecker(Consumer<Throwable> 
fail)
{
{color:#cf8e6d}this{color}.fail = fail;
}

@Override
{color:#cf8e6d}public void {color}onSendSmallFrame({color:#cf8e6d}int 
{color}messageCount, {color:#cf8e6d}int {color}payloadSizeInBytes)
{
}

@Override
{color:#cf8e6d}public void {color}onSentSmallFrame({color:#cf8e6d}int 
{color}messageCount, {color:#cf8e6d}int {color}payloadSizeInBytes)
{
}

@Override
{color:#cf8e6d}public void {color}onFailedSmallFrame({color:#cf8e6d}int 
{color}messageCount, {color:#cf8e6d}int {color}payloadSizeInBytes)
{
}

@Override
{color:#cf8e6d}public void {color}onConnect({color:#cf8e6d}int 
{color}messagingVersion, OutboundConnectionSettings settings)
{
{color:#cf8e6d}if {color}(acks.incrementAndGet() > {color:#2aacb8}1{color})
fail.accept({color:#cf8e6d}new {color}AssertionError({color:#6aab73}"Handshake 
was acknowledged more than once"{color}));
}
}
}
 

```

  was:
```java
{color:#7a7e85}/*
{color}{color:#7a7e85} * Licensed to the Apache Software Foundation (ASF) under 
one
{color}{color:#7a7e85} * or more contributor license agreements. See the NOTICE 
file
{color}{color:#7a7e85} * distributed with this work for additional information
{color}{color:#7a7e85} * regarding copyright ownership. The ASF licenses this 
file
{color}{color:#7a7e85} * to you under the Apache License, Version 2.0 (the
{color}{color:#7a7e85} * "License"); you may not use this file except in 
compliance
{color}{color:#7a7e85} * with the License. You may obtain a copy of the License 
at
{color}{color:#7a7e85} *
{color}{color:#7a7e85} * http://www.apache.org/licenses/LICENSE-2.0
{color}{color:#7a7e85} *
{color}{color:#7a7e85} * Unless required by applicable law or agreed to in 
writing, software
{color}{color:#7a7e85} * distributed under the License is distributed on an "AS 
IS" BASIS,
{color}{color:#7a7e85} * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied.
{color}{color:#7a7e85} * See the License for the specific language governing 
permissions and
{color}{color:#7a7e85} * limitations under the License.
{color}{color:#7a7e85} */
{color}{color:#7a7e85}
{color}{color:#cf8e6d}package {color}org.apache.cassandra.net;

{color:#cf8e6d}import {color}java.nio.channels.ClosedChannelException;
{color:#cf8e6d}import {color}java.util.ArrayList;
{color:#cf8e6d}import {color}java.util.HashMap;
{color:#cf8e6d}import {color}java.util.concurrent.ConcurrentHashMap;
{color:#cf8e6d}import {color}java.util.concurrent.ExecutionException;
{color:#cf8e6d}import {color}java.util.concurrent.TimeUnit;
{color:#cf8e6d}import {color}java.util.concurrent.atomic.AtomicInteger;
{color:#cf8e6d}import {color}java.util.function.Consumer;

{color:#cf8e6d}import {color}com.google.common.net.InetAddresses;
{color:#cf8e6d}import {color}org.junit.AfterClass;
{color:#cf8e6d}import {color}org.junit.Assert;
{color:#cf8e6d}import {color}org.junit.Before;
{color:#cf8e6d}import {color}org.junit.BeforeClass;
{color:#cf8e6d}import {color}org.junit.Test;

{color:#cf8e6d}import {color}io.netty.channel.EventLoop;
{color:#cf8e6d}import {color}io.netty.util.concurrent.Future;
{color:#cf8e6d}import {color}org.apache.cassandra.config.DatabaseDescriptor;
{color:#cf8e6d}import 
{color}org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
{color:#cf8e6d}import 
{color}org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions.Builder;
{color:#cf8e6d}import {color}org.apache.cassandra.config.ParameterizedClass;
{color:#cf8e6d}import {color}org.apache.cassandra.db.commitlog.CommitLog;
{color:#cf8e6d}import {color}org.apache.cassandra.gms.GossipDigestSyn;
{color:#cf8e6d}import {color}org.apache.cassandra.locator.InetAddressAndPort;
{color:#cf8e6d}import 
{color}org.apache.cassandra.security.DefaultSslContextFactory;
{color:#cf8e6d}import {color}org.apache.cassandra.transport.TlsTestUtils;

{color:#cf8e6d}import static 
{color}org.apache.cassandra.net.OutboundConnectionInitiator.Result;
{color:#cf8e6d}import static 
{color}org.apache.cassandra.net.OutboundConnectionInitiator.SslFallbackConnectionType;
{color:#cf8e6d}import static 
{color}org.apache.cassandra.net.OutboundConnectionInitiator.initiateStreaming;
{color:#cf8e6d}import static 
{color}org.apache.cassandra.net.MessagingService.current_version;
{color:#cf8e6d}import static 
{color}org.apache.cassandra.net.MessagingService.minimum_version;
{color:#cf8e6d}import static 
{color}org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions.ClientAuth.NOT_REQUIRED;
{color:#cf8e6d}import static 
{color}org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions.ClientAuth.REQUIRED;
{color:#cf8e6d}import static 
{color}org.apache.cassandra.tcm.ClusterMetadata.EMPTY_METADATA_IDENTIFIER;
{color:#cf8e6d}public class {color}StreamingTest
{
{color:#cf8e6d}private static final {color}SocketFactory factory = 
{color:#cf8e6d}new {color}SocketFactory();
{color:#cf8e6d}static final {color}InetAddressAndPort TO_ADDR = 
InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString({color:#6aab73}"127.0.0.2"{color}),
 {color:#2aacb8}7012{color});
{color:#cf8e6d}static final {color}InetAddressAndPort FROM_ADDR = 
InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString({color:#6aab73}"127.0.0.1"{color}),
 {color:#2aacb8}7012{color});
{color:#cf8e6d}private volatile {color}Throwable handshakeEx;
@BeforeClass
{color:#cf8e6d}public static void {color}startup()
{
DatabaseDescriptor.daemonInitialization();
CommitLog.instance.start();
}

@AfterClass
{color:#cf8e6d}public static void {color}cleanup() {color:#cf8e6d}throws 
{color}InterruptedException
{
factory.shutdownNow();
}

@Before
{color:#cf8e6d}public void {color}setup()
{
handshakeEx = {color:#cf8e6d}null{color};
}

{color:#cf8e6d}private {color}Future<Result<Result.StreamingSuccess>> 
streamingConnect(AcceptVersions acceptOutbound, AcceptVersions acceptInbound) 
{color:#cf8e6d}throws {color}ExecutionException, InterruptedException
{
InboundSockets inbound = {color:#cf8e6d}new 
{color}InboundSockets({color:#cf8e6d}new 
{color}InboundConnectionSettings().withAcceptMessaging(acceptInbound));
{color:#cf8e6d}try
{color}{color:#cf8e6d} {color}{
inbound.open();
InetAddressAndPort endpoint = inbound.sockets().stream().map(s -> 
s.settings.bindAddress).findFirst().get();
EventLoop eventLoop = factory.defaultGroup().next();
Future<Result<Result.StreamingSuccess>> result = initiateStreaming(eventLoop,
{color:#cf8e6d}new {color}OutboundConnectionSettings(endpoint)
.withAcceptVersions(acceptOutbound)
.withDefaults(ConnectionCategory.STREAMING),
SslFallbackConnectionType.SERVER_CONFIG
);
result.awaitUninterruptibly();
{color:#cf8e6d}return {color}result;
}
{color:#cf8e6d}finally
{color}{color:#cf8e6d} {color}{
inbound.close().await({color:#2aacb8}1L{color}, TimeUnit.SECONDS);
}
}

@Test
{color:#cf8e6d}public void {color}testIncompatibleVersion() 
{color:#cf8e6d}throws {color}InterruptedException, ExecutionException
{
Future<Result<Result.StreamingSuccess>> result = 
streamingConnect({color:#cf8e6d}new {color}AcceptVersions(current_version + 
{color:#2aacb8}1{color}, current_version + {color:#2aacb8}1{color}), 
{color:#cf8e6d}new {color}AcceptVersions(minimum_version + 
{color:#2aacb8}2{color}, current_version + {color:#2aacb8}3{color}));
{color:#cf8e6d}if {color}(result.isSuccess()) {
Result<Result.StreamingSuccess> nowResult = result.getNow();
Assert.assertNull(nowResult.success());
Assert.assertEquals(Result.Outcome.INCOMPATIBLE, nowResult.outcome);
Assert.assertEquals(current_version, 
nowResult.incompatible().closestSupportedVersion);
Assert.assertEquals(current_version, 
nowResult.incompatible().maxMessagingVersion);

} {color:#cf8e6d}else {color}{
Assert.assertTrue({color:#cf8e6d}false{color});
}
}

@Test
{color:#cf8e6d}public void {color}testCompatibleVersion() {color:#cf8e6d}throws 
{color}InterruptedException, ExecutionException
{
Future<Result<Result.StreamingSuccess>> result = 
streamingConnect({color:#cf8e6d}new 
{color}AcceptVersions(MessagingService.minimum_version, current_version + 
{color:#2aacb8}1{color}), {color:#cf8e6d}new 
{color}AcceptVersions(minimum_version + {color:#2aacb8}2{color}, 
current_version + {color:#2aacb8}3{color}));
{color:#cf8e6d}if {color}(result.isSuccess()) {
Result<Result.StreamingSuccess> nowResult = result.getNow();
Assert.assertNotNull(nowResult.success().channel);
Assert.assertEquals(Result.Outcome.SUCCESS, nowResult.outcome);
Assert.assertEquals(current_version, nowResult.success().messagingVersion);
} {color:#cf8e6d}else {color}{
Assert.assertTrue({color:#cf8e6d}false{color});
}
}

{color:#cf8e6d}private {color}ServerEncryptionOptions 
getServerEncryptionOptions(SslFallbackConnectionType sslConnectionType, 
{color:#cf8e6d}boolean {color}optional)
{
Builder serverEncryptionOptionsBuilder = {color:#cf8e6d}new {color}Builder();

serverEncryptionOptionsBuilder.withOutboundKeystore(TlsTestUtils.SERVER_OUTBOUND_KEYSTORE_PATH)
.withOutboundKeystorePassword(TlsTestUtils.SERVER_OUTBOUND_KEYSTORE_PASSWORD)
.withOptional(optional)
.withKeyStore(TlsTestUtils.SERVER_KEYSTORE_PATH)
.withKeyStorePassword(TlsTestUtils.SERVER_KEYSTORE_PASSWORD)
.withTrustStore(TlsTestUtils.SERVER_TRUSTSTORE_PATH).withTrustStorePassword(TlsTestUtils.SERVER_TRUSTSTORE_PASSWORD)
.withSslContextFactory(({color:#cf8e6d}new 
{color}ParameterizedClass(DefaultSslContextFactory.{color:#cf8e6d}class{color}.getName(),
{color:#cf8e6d}new {color}HashMap<>())));

{color:#cf8e6d}if {color}(sslConnectionType == SslFallbackConnectionType.MTLS)
{
serverEncryptionOptionsBuilder.withInternodeEncryption(ServerEncryptionOptions.InternodeEncryption.all)
.withRequireClientAuth(REQUIRED);
}
{color:#cf8e6d}else if {color}(sslConnectionType == 
SslFallbackConnectionType.SSL)
{
serverEncryptionOptionsBuilder.withInternodeEncryption(ServerEncryptionOptions.InternodeEncryption.all)
.withRequireClientAuth(NOT_REQUIRED);
}
{color:#cf8e6d}return {color}serverEncryptionOptionsBuilder.build();
}

{color:#cf8e6d}private {color}OutboundConnection 
initiateOutbound(InetAddressAndPort endpoint, SslFallbackConnectionType 
connectionType, {color:#cf8e6d}boolean {color}optional) {color:#cf8e6d}throws 
{color}ClosedChannelException
{
{color:#cf8e6d}final {color}OutboundConnectionSettings settings = 
{color:#cf8e6d}new {color}OutboundConnectionSettings(endpoint)
.withAcceptVersions({color:#cf8e6d}new {color}AcceptVersions(minimum_version, 
current_version))
.withDefaults(ConnectionCategory.MESSAGING)
.withEncryption(getServerEncryptionOptions(connectionType, optional))
.withDebugCallbacks({color:#cf8e6d}new {color}HandshakeAcknowledgeChecker(t -> 
handshakeEx = t))
.withFrom(FROM_ADDR);
OutboundConnections outboundConnections = 
OutboundConnections.tryRegister({color:#cf8e6d}new 
{color}ConcurrentHashMap<>(), TO_ADDR, settings);
GossipDigestSyn syn = {color:#cf8e6d}new 
{color}GossipDigestSyn({color:#6aab73}"cluster"{color}, 
{color:#6aab73}"partitioner"{color}, EMPTY_METADATA_IDENTIFIER, 
{color:#cf8e6d}new {color}ArrayList<>({color:#2aacb8}0{color}));
Message<GossipDigestSyn> message = Message.out(Verb.GOSSIP_DIGEST_SYN, syn);
OutboundConnection outboundConnection = 
outboundConnections.connectionFor(message);
outboundConnection.enqueue(message);
{color:#cf8e6d}return {color}outboundConnection;
}
{color:#cf8e6d}private static class {color}HandshakeAcknowledgeChecker 
{color:#cf8e6d}implements {color}OutboundDebugCallbacks
{
{color:#cf8e6d}private final {color}AtomicInteger acks = {color:#cf8e6d}new 
{color}AtomicInteger({color:#2aacb8}0{color});
{color:#cf8e6d}private final {color}Consumer<Throwable> fail;

{color:#cf8e6d}private {color}HandshakeAcknowledgeChecker(Consumer<Throwable> 
fail)
{
{color:#cf8e6d}this{color}.fail = fail;
}

@Override
{color:#cf8e6d}public void {color}onSendSmallFrame({color:#cf8e6d}int 
{color}messageCount, {color:#cf8e6d}int {color}payloadSizeInBytes)
{
}

@Override
{color:#cf8e6d}public void {color}onSentSmallFrame({color:#cf8e6d}int 
{color}messageCount, {color:#cf8e6d}int {color}payloadSizeInBytes)
{
}

@Override
{color:#cf8e6d}public void {color}onFailedSmallFrame({color:#cf8e6d}int 
{color}messageCount, {color:#cf8e6d}int {color}payloadSizeInBytes)
{
}

@Override
{color:#cf8e6d}public void {color}onConnect({color:#cf8e6d}int 
{color}messagingVersion, OutboundConnectionSettings settings)
{
{color:#cf8e6d}if {color}(acks.incrementAndGet() > {color:#2aacb8}1{color})
fail.accept({color:#cf8e6d}new {color}AssertionError({color:#6aab73}"Handshake 
was acknowledged more than once"{color}));
}
}
}
 

```


> java.lang.ClassCastException: Issues while joining
> --------------------------------------------------
>
>                 Key: CASSANDRA-20984
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-20984
>             Project: Apache Cassandra
>          Issue Type: Bug
>          Components: Consistency/Bootstrap and Decommission, 
> Consistency/Streaming
>            Reporter: Vivekanand Koya
>            Assignee: Vivekanand Koya
>            Priority: Normal
>
> From [~rustyrazorblade]'s recommendation
> I've taken a closer look at the code. I've been able to reproduce the issue 
> and have produced a fix. Along the way, I've noticed a few things. 
>  # initiateStreaming() method of OutboundConnectionInitiator is called from 
> NettyStreamingConnectionFactory located in the streaming.async package. 
> Similar method invocation initiateMessaging() method of 
> OutboundConnectionInitiator is made from OutboundConnection (net package). 
> Thus, the enum Outcome is package-private. The sanity check based on 
> OutboundConnectionInitiator.Result.Outcome could be performed from 
> OutboundConnection class. The same check cannot be performed from 
> NettyStreamingConnectionFactory since the outcome field of Result is 
> inaccessible. 
>  # There is an attempt to cast to the generic type SuccessType in 
> OutboundConnectionIntiator.Result class. This behavior is inconsistent with 
> the retry() and incompatible() methods which return their respective classes.
>  # In NettyStreamingConnectionFactory, there appears to be some confusion in 
> the involcation of isSuccess() method. It actually is making the invocation 
> on Netty Future. It should have been on the Result object. On making a 
> successful connect, NettyStreamingConnectionFactory calls success() on 
> Future' s getNow() without checking the type of the cast.
>  # There are no tests for initiateStreaming() method of 
> OutboundConnectionInitiator as there are for initiateMessaging() method of 
> OutboundConnectionInitiator.
> What I've done. # I wrote a test (StreamingTest) that reproduces the issue in 
> [https://lists.apache.org/thread/ykkwhjdpgyqzw5xtol4v5ysz664bxxl3].
>  # I used the instanceof in [https://openjdk.org/jeps/394] to make incorrect 
> comparisons a compile-time error. This is done in OutboundConnection where I 
> check if result.success() instanceof MessagingSuccess and 
> OutboundConnectionInitiator where I return Success safely instead.
>  
> _Please note: this change makes use of a feature in JDK 16 and thus needs a 
> higher minimum JDK._



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to