This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 4318e74180d710844b075fedc33e8ca008f87f63
Author: Marcus Eriksson <marc...@apache.org>
AuthorDate: Mon Mar 24 15:15:53 2025 +0100

    Various gossip to TCM upgrade fixes
    
    patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-20483
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/gms/FailureDetector.java  |  29 +++--
 src/java/org/apache/cassandra/gms/Gossiper.java    |  42 +++++++-
 .../apache/cassandra/service/StorageService.java   |   3 +-
 .../cassandra/tcm/ClusterMetadataService.java      |   7 ++
 src/java/org/apache/cassandra/tcm/Startup.java     |  25 +++--
 .../cassandra/tcm/compatibility/GossipHelper.java  |  72 ++++++++++++-
 .../apache/cassandra/tcm/membership/Directory.java |   2 +-
 .../cassandra/tcm/membership/NodeVersion.java      |   2 +-
 .../apache/cassandra/tcm/migration/Election.java   |  13 ++-
 .../cassandra/tcm/migration/GossipCMSListener.java |   3 +
 .../tcm/sequences/BootstrapAndReplace.java         |  12 +++
 .../tcm/sequences/ReplaceSameAddress.java          |   1 +
 .../apache/cassandra/utils/CassandraVersion.java   |   1 +
 .../cassandra/distributed/UpgradeableCluster.java  |  15 +++
 .../ClusterMetadataUpgradeAssassinateTest.java     |  29 +++--
 .../ClusterMetadataUpgradeHibernateTest.java       | 119 +++++++++++++++++++++
 .../ClusterMetadataUpgradeJoinRingTest.java        | 102 ++++++++++++++++++
 .../distributed/upgrade/UpgradeTestBase.java       |  19 +++-
 .../tcm/compatibility/GossipHelperTest.java        |  35 +++++-
 20 files changed, 488 insertions(+), 44 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 864e664620..0449ba45d5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Various gossip to TCM upgrade fixes (CASSANDRA-20483)
  * Add nodetool command to abort failed nodetool cms initialize 
(CASSANDRA-20482)
  * Repair Paxos for the distributed metadata log when CMS membership changes 
(CASSANDRA-20467)
  * Reintroduce CASSANDRA-17411 in trunk (CASSANDRA-19346)
diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java 
b/src/java/org/apache/cassandra/gms/FailureDetector.java
index 34c98da4b7..49b2089297 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -47,12 +47,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.MultiStepOperation;
 import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.sequences.BootstrapAndReplace;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MBeanWrapper;
 
@@ -150,7 +151,7 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
         for (Map.Entry<InetAddressAndPort, EndpointState> entry : 
Gossiper.instance.endpointStateMap.entrySet())
         {
             sb.append(resolveIp ? entry.getKey().getHostName(withPort) : 
entry.getKey().toString(withPort)).append("\n");
-            appendEndpointState(sb, entry.getValue());
+            appendEndpointState(sb, entry.getKey(), entry.getValue());
         }
         return sb.toString();
     }
@@ -242,12 +243,13 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
     public String getEndpointState(String address) throws UnknownHostException
     {
         StringBuilder sb = new StringBuilder();
-        EndpointState endpointState = 
Gossiper.instance.getEndpointStateForEndpoint(InetAddressAndPort.getByName(address));
-        appendEndpointState(sb, endpointState);
+        InetAddressAndPort endpoint = InetAddressAndPort.getByName(address);
+        EndpointState endpointState = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+        appendEndpointState(sb, endpoint, endpointState);
         return sb.toString();
     }
 
-    private void appendEndpointState(StringBuilder sb, EndpointState 
endpointState)
+    private void appendEndpointState(StringBuilder sb, InetAddressAndPort 
endpoint, EndpointState endpointState)
     {
         sb.append("  
generation:").append(endpointState.getHeartBeatState().getGeneration()).append("\n");
         sb.append("  
heartbeat:").append(endpointState.getHeartBeatState().getHeartBeatVersion()).append("\n");
@@ -258,9 +260,20 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
             sb.append("  
").append(state.getKey()).append(":").append(state.getValue().version).append(":").append(state.getValue().value).append("\n");
         }
         ClusterMetadata metadata = ClusterMetadata.current();
-        NodeId nodeId = 
metadata.directory.peerId(FBUtilities.getBroadcastAddressAndPort());
-        List<Token> tokens = metadata.tokenMap.tokens(nodeId);
-        if (tokens != null && !tokens.isEmpty())
+        NodeId nodeId = metadata.directory.peerId(endpoint);
+        boolean foundTokens = false;
+        if (nodeId != null)
+        {
+            foundTokens = !metadata.tokenMap.tokens(nodeId).isEmpty();
+            if (!foundTokens)
+            {
+                MultiStepOperation<?> mso = 
metadata.inProgressSequences.get(nodeId);
+                if (mso instanceof BootstrapAndReplace)
+                    foundTokens = true;
+            }
+        }
+
+        if (foundTokens)
             sb.append("  
TOKENS:").append(metadata.epoch.getEpoch()).append(":<hidden>\n");
         else
             sb.append("  TOKENS: not present\n");
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java 
b/src/java/org/apache/cassandra/gms/Gossiper.java
index 5274a57ffb..14cc5f5ada 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -454,6 +454,26 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean,
         return state.equals(VersionedValue.SHUTDOWN);
     }
 
+    public static boolean isHibernate(EndpointState epState)
+    {
+        VersionedValue versionedValue = 
epState.getApplicationState(ApplicationState.STATUS_WITH_PORT);
+        if (versionedValue == null)
+            versionedValue = 
epState.getApplicationState(ApplicationState.STATUS);
+        return isHibernate(versionedValue);
+    }
+
+    public static boolean isHibernate(VersionedValue vv)
+    {
+        if (vv == null)
+            return false;
+
+        String value = vv.value;
+        String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1);
+        assert (pieces.length > 0);
+        String state = pieces[0];
+        return state.equals(VersionedValue.HIBERNATE);
+    }
+
     public static void runInGossipStageBlocking(Runnable runnable)
     {
         // run immediately if we're already in the gossip stage
@@ -2106,10 +2126,18 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean,
      */
     public void mergeNodeToGossip(NodeId nodeId, ClusterMetadata metadata)
     {
-        mergeNodeToGossip(nodeId, metadata, metadata.tokenMap.tokens(nodeId));
+        mergeNodeToGossip(nodeId, metadata, metadata.tokenMap.tokens(nodeId), 
false);
+    }
+    public void mergeNodeToGossip(NodeId nodeId, ClusterMetadata metadata, 
boolean forceHibernate)
+    {
+        mergeNodeToGossip(nodeId, metadata, metadata.tokenMap.tokens(nodeId), 
forceHibernate);
     }
-
     public void mergeNodeToGossip(NodeId nodeId, ClusterMetadata metadata, 
Collection<Token> tokens)
+    {
+        mergeNodeToGossip(nodeId, metadata, tokens, false);
+    }
+
+    private void mergeNodeToGossip(NodeId nodeId, ClusterMetadata metadata, 
Collection<Token> tokens, boolean forceHibernate)
     {
         taskLock.lock();
         try
@@ -2156,7 +2184,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean,
                             newValue = valueFactory.hostId(uuid);
                             break;
                         case TOKENS:
-                            if (tokens != null)
+                            if (tokens != null && !tokens.isEmpty())
                                 newValue = valueFactory.tokens(tokens);
                             break;
                         case INTERNAL_ADDRESS_AND_PORT:
@@ -2175,6 +2203,14 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean,
                             // In this case, the app state will be set to 
`hibernate` by StorageService, so
                             // don't set it here as nodeStateToStatus only 
considers persistent states (e.g.
                             // ones stored in ClusterMetadata), it isn't aware 
of transient states like hibernate.
+                            // forceHibernate can be true when upgrading from 
pre-tcm versions - if a node is hibernating
+                            // we have no state for this in cluster metadata, 
so we need to explicitly keep that from
+                            // the pre-upgrade gossip states
+                            if (forceHibernate)
+                            {
+                                newValue = valueFactory.hibernate(true);
+                                break;
+                            }
                             if (isLocal && 
!StorageService.instance.shouldJoinRing())
                                 break;
                             newValue = GossipHelper.nodeStateToStatus(nodeId, 
metadata, tokens, valueFactory, oldValue);
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 53f815a586..812dc31cd4 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2084,8 +2084,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         // normal STATUS.
         if (state == ApplicationState.STATUS_WITH_PORT)
         {
-            String[] pieces = splitValue(value);
-            if (pieces[0].equals(VersionedValue.HIBERNATE))
+            if (Gossiper.isHibernate(value))
             {
                 logger.info("Node {} state jump to hibernate", endpoint);
                 Gossiper.runInGossipStageBlocking(() -> {
diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java 
b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
index 4c6eed11b4..3e3d8389ae 100644
--- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
+++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
@@ -318,6 +318,13 @@ public class ClusterMetadataService
         }
 
         ClusterMetadata metadata = metadata();
+        if (metadata.myNodeState() != NodeState.JOINED)
+        {
+            String msg = String.format("Initial CMS node needs to be fully 
joined, not: %s", metadata.myNodeState());
+            logger.error(msg);
+            throw new IllegalStateException(msg);
+        }
+
         Set<InetAddressAndPort> existingMembers = metadata.fullCMSMembers();
 
         if (!metadata.directory.allAddresses().containsAll(ignored))
diff --git a/src/java/org/apache/cassandra/tcm/Startup.java 
b/src/java/org/apache/cassandra/tcm/Startup.java
index edd8734fca..d17b687698 100644
--- a/src/java/org/apache/cassandra/tcm/Startup.java
+++ b/src/java/org/apache/cassandra/tcm/Startup.java
@@ -74,6 +74,7 @@ import static 
org.apache.cassandra.tcm.ClusterMetadataService.State.LOCAL;
 import static 
org.apache.cassandra.tcm.compatibility.GossipHelper.emptyWithSchemaFromSystemTables;
 import static 
org.apache.cassandra.tcm.compatibility.GossipHelper.fromEndpointStates;
 import static org.apache.cassandra.tcm.membership.NodeState.JOINED;
+import static org.apache.cassandra.tcm.membership.NodeState.LEFT;
 import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
 
  /**
@@ -313,18 +314,26 @@ import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
         logger.debug("Created initial ClusterMetadata {}", initial);
         ClusterMetadataService.instance().setFromGossip(initial);
         Gossiper.instance.clearUnsafe();
-        if (switchIp != null)
-        {
-            // quarantine the old ip to make sure it doesn't get re-added via 
gossip
-            InetAddressAndPort removeEp = switchIp;
-            Gossiper.runInGossipStageBlocking(() -> 
Gossiper.instance.removeEndpoint(removeEp));
-        }
+
+        // find any endpoints that were ignored on upgrade and make sure they 
don't get re-added in gossip
+        InetAddressAndPort removeEp = switchIp;
+        Gossiper.runInGossipStageBlocking(() -> {
+            if (removeEp != null)
+                Gossiper.instance.removeEndpoint(removeEp);
+            for (InetAddressAndPort ep : epStates.keySet())
+            {
+                if (initial.directory.peerId(ep) == null)
+                    Gossiper.instance.removeEndpoint(ep); // just quarantines 
the ep - endpoint states should be empty
+                                                          // here (this is run 
before Gossiper is started)
+            }
+        });
+
         
Gossiper.instance.maybeInitializeLocalState(SystemKeyspace.incrementAndGetGeneration());
         for (Map.Entry<NodeId, NodeState> entry : 
initial.directory.states.entrySet())
         {
             InetAddressAndPort ep = 
initial.directory.addresses.get(entry.getKey()).broadcastAddress;
-            if (entry.getValue() != NodeState.LEFT)
-                Gossiper.instance.mergeNodeToGossip(entry.getKey(), initial);
+            if (entry.getValue() != LEFT)
+                Gossiper.instance.mergeNodeToGossip(entry.getKey(), initial, 
Gossiper.isHibernate(epStates.get(ep)));
             else
                 Gossiper.runInGossipStageBlocking(() -> 
Gossiper.instance.endpointStateMap.put(ep, epStates.get(ep)));
         }
diff --git a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java 
b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java
index 0e1acbc4ea..1a555fce4d 100644
--- a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java
+++ b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java
@@ -87,6 +87,7 @@ import static 
org.apache.cassandra.gms.ApplicationState.RELEASE_VERSION;
 import static org.apache.cassandra.gms.ApplicationState.RPC_ADDRESS;
 import static org.apache.cassandra.gms.ApplicationState.STATUS_WITH_PORT;
 import static org.apache.cassandra.gms.ApplicationState.TOKENS;
+import static org.apache.cassandra.gms.Gossiper.isHibernate;
 import static org.apache.cassandra.gms.Gossiper.isShutdown;
 import static org.apache.cassandra.locator.InetAddressAndPort.getByName;
 import static 
org.apache.cassandra.locator.InetAddressAndPort.getByNameOverrideDefaults;
@@ -123,6 +124,8 @@ public class GossipHelper
             case JOINED:
                 if (isShutdown(oldValue))
                     status = valueFactory.shutdown(true);
+                else if (isHibernate(oldValue))
+                    status = valueFactory.hibernate(true);
                 else
                     status = valueFactory.normal(tokens);
                 break;
@@ -224,10 +227,13 @@ public class GossipHelper
 
         String status = epState.getStatus();
         if (status.equals(VersionedValue.STATUS_NORMAL) ||
-            status.equals(VersionedValue.SHUTDOWN))
+            status.equals(VersionedValue.SHUTDOWN) ||
+            status.equals(VersionedValue.HIBERNATE))
             return NodeState.JOINED;
         if (status.equals(VersionedValue.STATUS_LEFT))
             return NodeState.LEFT;
+        if (status.isEmpty())
+            return NodeState.REGISTERED;
         throw new IllegalStateException("Can't upgrade the first node when 
STATUS = " + status + " for node " + endpoint);
     }
 
@@ -332,6 +338,12 @@ public class GossipHelper
     {
         Directory directory = new 
Directory().withLastModified(Epoch.UPGRADE_GOSSIP);
         TokenMap tokenMap = new 
TokenMap(partitioner).withLastModified(Epoch.UPGRADE_GOSSIP);
+
+        // gossip can contain old hosts with duplicate host ids during 
upgrades from pre-TCM versions. We need to clean
+        // those up during upgrades since TCM is more strict. Here we simply 
keep the host with the newest gossip
+        // generation if there is a duplicate hostid.
+        if (containsDuplicateHostIds(epStates))
+            epStates = cleanupDuplicateHostIds(epStates);
         List<InetAddressAndPort> sortedEps = 
Lists.newArrayList(epStates.keySet());
         Collections.sort(sortedEps);
         Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions = new 
HashMap<>();
@@ -344,14 +356,20 @@ public class GossipHelper
             NodeAddresses nodeAddresses = 
getAddressesFromEndpointState(endpoint, epState);
             NodeVersion nodeVersion = getVersionFromEndpointState(endpoint, 
epState);
             assert hostIdString != null;
-            NodeState nodeState = toNodeState(endpoint, epState);
+            // some clusters have old, removed hibernating endpoints in 
gossip, ignore these if they don't exist in our system.peers_v2 table
+            if (!endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) && 
Gossiper.isHibernate(epState) && 
!SystemKeyspace.loadTokens().containsKey(endpoint))
+            {
+                logger.info("Ignoring endpoint {} with endpoint states {} 
since it is missing from system.peers_v2", endpoint, epState);
+                continue;
+            }
 
+            NodeState nodeState = toNodeState(endpoint, epState);
             directory = directory.withNonUpgradedNode(nodeAddresses,
                                                       new Location(dc, rack),
                                                       nodeVersion,
                                                       nodeState,
                                                       
UUID.fromString(hostIdString));
-            if (nodeState != NodeState.LEFT)
+            if (nodeState == NodeState.JOINED)
             {
                 NodeId nodeId = directory.peerId(endpoint);
                 tokenMap = tokenMap.assignTokens(nodeId, 
getTokensIn(partitioner, epState));
@@ -385,7 +403,7 @@ public class GossipHelper
     {
         if (epstates.isEmpty())
             return false;
-        EnumSet<ApplicationState> requiredStates = EnumSet.of(DC, RACK, 
HOST_ID, TOKENS, RELEASE_VERSION);
+        EnumSet<ApplicationState> requiredStates = EnumSet.of(DC, RACK, 
HOST_ID, RELEASE_VERSION);
         for (Map.Entry<InetAddressAndPort, EndpointState> entry : 
epstates.entrySet())
         {
             EndpointState epstate = entry.getValue();
@@ -398,4 +416,50 @@ public class GossipHelper
         }
         return true;
     }
+
+    private static boolean containsDuplicateHostIds(Map<InetAddressAndPort, 
EndpointState> epstates)
+    {
+        Set<String> hostIds = new HashSet<>();
+        for (EndpointState epstate : epstates.values())
+        {
+            String hostIdString = epstate.getApplicationState(HOST_ID).value;
+            if (hostIds.contains(hostIdString))
+                return true;
+            hostIds.add(hostIdString);
+        }
+        return false;
+    }
+
+    private static Map<InetAddressAndPort, EndpointState> 
cleanupDuplicateHostIds(Map<InetAddressAndPort, EndpointState> epstates)
+    {
+        Map<InetAddressAndPort, EndpointState> cleanEpstates = new HashMap<>();
+        Map<String, InetAddressAndPort> seenHostIds = new HashMap<>();
+        for (Map.Entry<InetAddressAndPort, EndpointState> entry : 
epstates.entrySet())
+        {
+            InetAddressAndPort endpoint = entry.getKey();
+            EndpointState epstate = entry.getValue();
+            String hostIdString = epstate.getApplicationState(HOST_ID).value;
+            if (seenHostIds.containsKey(hostIdString))
+            {
+                int thisGeneration = 
epstate.getHeartBeatState().getGeneration();
+
+                InetAddressAndPort seenHost = seenHostIds.get(hostIdString);
+                int seenGeneration = 
epstates.get(seenHost).getHeartBeatState().getGeneration();
+                logger.warn("Duplicate host id {} found: {} with generation {} 
and {} with generation {}, keeping the one with the newest generation",
+                            hostIdString, seenHost, seenGeneration, endpoint, 
thisGeneration);
+                if (thisGeneration > seenGeneration)
+                {
+                    cleanEpstates.remove(seenHost);
+                    cleanEpstates.put(endpoint, epstate);
+                    seenHostIds.put(hostIdString, endpoint);
+                }
+            }
+            else
+            {
+                seenHostIds.put(hostIdString, endpoint);
+                cleanEpstates.put(endpoint, epstate);
+            }
+        }
+        return cleanEpstates;
+    }
 }
diff --git a/src/java/org/apache/cassandra/tcm/membership/Directory.java 
b/src/java/org/apache/cassandra/tcm/membership/Directory.java
index 90af4ff79f..51ab84c520 100644
--- a/src/java/org/apache/cassandra/tcm/membership/Directory.java
+++ b/src/java/org/apache/cassandra/tcm/membership/Directory.java
@@ -157,7 +157,7 @@ public class Directory implements MetadataValue<Directory>
     {
         NodeId id = new NodeId(nextId);
         Directory updated = with(addresses, id, hostId, location, 
version).withNodeState(id, state);
-        if (state != NodeState.LEFT)
+        if (state == NodeState.JOINED)
             updated = updated.withRackAndDC(id);
         return updated;
     }
diff --git a/src/java/org/apache/cassandra/tcm/membership/NodeVersion.java 
b/src/java/org/apache/cassandra/tcm/membership/NodeVersion.java
index e5b608994a..bc1bcc707e 100644
--- a/src/java/org/apache/cassandra/tcm/membership/NodeVersion.java
+++ b/src/java/org/apache/cassandra/tcm/membership/NodeVersion.java
@@ -36,7 +36,7 @@ public class NodeVersion implements Comparable<NodeVersion>
     public static final Serializer serializer = new Serializer();
     public static final Version CURRENT_METADATA_VERSION = Version.V6;
     public static final NodeVersion CURRENT = new NodeVersion(new 
CassandraVersion(FBUtilities.getReleaseVersionString()), 
CURRENT_METADATA_VERSION);
-    private static final CassandraVersion SINCE_VERSION = 
CassandraVersion.CASSANDRA_5_0;
+    private static final CassandraVersion SINCE_VERSION = 
CassandraVersion.CASSANDRA_5_1;
 
     public final CassandraVersion cassandraVersion;
     public final int serializationVersion;
diff --git a/src/java/org/apache/cassandra/tcm/migration/Election.java 
b/src/java/org/apache/cassandra/tcm/migration/Election.java
index 69fac6f4ba..94f5dc4a06 100644
--- a/src/java/org/apache/cassandra/tcm/migration/Election.java
+++ b/src/java/org/apache/cassandra/tcm/migration/Election.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.tcm.migration;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
@@ -38,6 +39,8 @@ import org.apache.cassandra.schema.SchemaKeyspace;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.Startup;
 import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeState;
 import org.apache.cassandra.tcm.ownership.TokenMap;
 import org.apache.cassandra.tcm.transformations.Register;
 import org.apache.cassandra.net.MessageDelivery;
@@ -50,6 +53,8 @@ import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
+import static org.apache.cassandra.tcm.membership.NodeState.LEFT;
+
 /**
  * Election process establishes initial CMS leader, from which you can further 
evolve cluster metadata.
  */
@@ -180,10 +185,12 @@ public class Election
         CMSInitializationRequest.Initiator currentInitiator = initiator.get();
         if (currentInitiator != null && 
Objects.equals(currentInitiator.initiator, expectedInitiator) && 
initiator.compareAndSet(currentInitiator, null))
         {
-            for (InetAddressAndPort ep : 
ClusterMetadata.current().directory.allJoinedEndpoints())
+            ClusterMetadata metadata = ClusterMetadata.current();
+            for (Map.Entry<NodeId, NodeState> entry : 
metadata.directory.states.entrySet())
             {
-                if (!ep.equals(FBUtilities.getBroadcastAddressAndPort()))
-                    messaging.send(Message.out(Verb.TCM_ABORT_MIG, 
currentInitiator), ep);
+                NodeId nodeId = entry.getKey();
+                if (!Objects.equals(metadata.myNodeId(), nodeId) && 
entry.getValue() != LEFT)
+                    messaging.send(Message.out(Verb.TCM_ABORT_MIG, 
currentInitiator), metadata.directory.endpoint(nodeId));
             }
         }
         else
diff --git a/src/java/org/apache/cassandra/tcm/migration/GossipCMSListener.java 
b/src/java/org/apache/cassandra/tcm/migration/GossipCMSListener.java
index b712a695a2..97ff11b0e7 100644
--- a/src/java/org/apache/cassandra/tcm/migration/GossipCMSListener.java
+++ b/src/java/org/apache/cassandra/tcm/migration/GossipCMSListener.java
@@ -54,6 +54,9 @@ public class GossipCMSListener implements 
IEndpointStateChangeSubscriber
         if (nodeId == null)
         {
             VersionedValue hostIdValue = 
epState.getApplicationState(ApplicationState.HOST_ID);
+            if (Gossiper.isHibernate(epState))
+                return;
+
             if (hostIdValue != null)
             {
                 UUID hostId = UUID.fromString(hostIdValue.value);
diff --git 
a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java 
b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java
index e774b77ebe..2b283d6905 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.tcm.sequences;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
@@ -443,6 +444,17 @@ public class BootstrapAndReplace extends 
MultiStepOperation<Epoch>
         Gossiper.instance.addLocalApplicationStates(states);
     }
 
+    public static void gossipStateToNormal(ClusterMetadata metadata, NodeId 
nodeId)
+    {
+        List<Pair<ApplicationState, VersionedValue>> states = new 
ArrayList<>();
+        VersionedValue.VersionedValueFactory valueFactory = 
StorageService.instance.valueFactory;
+        Collection<Token> tokens = metadata.tokenMap.tokens(nodeId);
+        states.add(Pair.create(ApplicationState.TOKENS, 
valueFactory.tokens(tokens)));
+        states.add(Pair.create(ApplicationState.STATUS_WITH_PORT, 
valueFactory.normal(tokens)));
+        states.add(Pair.create(ApplicationState.STATUS, 
valueFactory.normal(tokens)));
+        Gossiper.instance.addLocalApplicationStates(states);
+    }
+
     public static class Serializer implements 
AsymmetricMetadataSerializer<MultiStepOperation<?>, BootstrapAndReplace>
     {
         public void serialize(MultiStepOperation<?> t, DataOutputPlus out, 
Version version) throws IOException
diff --git 
a/src/java/org/apache/cassandra/tcm/sequences/ReplaceSameAddress.java 
b/src/java/org/apache/cassandra/tcm/sequences/ReplaceSameAddress.java
index 2c4553a14f..be75538d68 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/ReplaceSameAddress.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/ReplaceSameAddress.java
@@ -93,6 +93,7 @@ public class ReplaceSameAddress
             StreamSupport.stream(ColumnFamilyStore.all().spliterator(), false)
                          .filter(cfs -> 
Schema.instance.getUserKeyspaces().names().contains(cfs.keyspace.getName()))
                          .forEach(cfs -> 
cfs.indexManager.executePreJoinTasksBlocking(true));
+            BootstrapAndReplace.gossipStateToNormal(metadata, 
metadata.myNodeId());
             Gossiper.instance.mergeNodeToGossip(metadata.myNodeId(), metadata);
         }
     }
diff --git a/src/java/org/apache/cassandra/utils/CassandraVersion.java 
b/src/java/org/apache/cassandra/utils/CassandraVersion.java
index 587e06d061..27891f560a 100644
--- a/src/java/org/apache/cassandra/utils/CassandraVersion.java
+++ b/src/java/org/apache/cassandra/utils/CassandraVersion.java
@@ -50,6 +50,7 @@ public class CassandraVersion implements 
Comparable<CassandraVersion>
 
     private static final Pattern PATTERN = Pattern.compile(VERSION_REGEXP);
 
+    public static final CassandraVersion CASSANDRA_5_1 = new 
CassandraVersion("5.1").familyLowerBound.get();
     public static final CassandraVersion CASSANDRA_5_0 = new 
CassandraVersion("5.0").familyLowerBound.get();
     public static final CassandraVersion CASSANDRA_4_1 = new 
CassandraVersion("4.1").familyLowerBound.get();
     public static final CassandraVersion CASSANDRA_4_0 = new 
CassandraVersion("4.0").familyLowerBound.get();
diff --git 
a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java 
b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
index b7bd1d6d66..68b542c848 100644
--- a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
@@ -19,11 +19,14 @@
 package org.apache.cassandra.distributed;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.function.Consumer;
 
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
 import org.apache.cassandra.distributed.impl.AbstractCluster;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
 import org.apache.cassandra.distributed.shared.Versions;
 
 /**
@@ -73,6 +76,18 @@ public class UpgradeableCluster extends 
AbstractCluster<IUpgradeableInstance> im
         return builder.start();
     }
 
+    public static UpgradeableCluster create(int nodeCount, Versions.Version 
version, Consumer<IInstanceConfig> configUpdater, Consumer<Builder> 
builderUpdater, TokenSupplier tokenSupplier, Map<Integer, 
NetworkTopology.DcAndRack> nodeIdTopology) throws IOException
+    {
+        Builder builder = 
build(nodeCount).withConfig(configUpdater).withVersion(version);
+        if (tokenSupplier != null)
+            builder = builder.withTokenSupplier(tokenSupplier);
+        if (nodeIdTopology != null)
+            builder = builder.withNodeIdTopology(nodeIdTopology);
+        if (builderUpdater != null)
+            builderUpdater.accept(builder);
+        return builder.start();
+    }
+
     public static UpgradeableCluster create(int nodeCount, Versions.Version 
version) throws Throwable
     {
         return build(nodeCount).withVersion(version).start();
diff --git 
a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeAssassinateTest.java
 
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeAssassinateTest.java
index 9e5be9639a..7712a040d9 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeAssassinateTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeAssassinateTest.java
@@ -43,24 +43,31 @@ public class ClusterMetadataUpgradeAssassinateTest extends 
UpgradeTestBase
             cluster.get(1).nodetoolResult("assassinate", 
"127.0.0.3").asserts().success();
         })
         .runAfterClusterUpgrade((cluster) -> {
-            checkPlacements(cluster.get(1));
-            checkPlacements(cluster.get(2));
+            checkPlacements(cluster.get(1), false);
+            checkPlacements(cluster.get(2), false);
             cluster.get(1).nodetoolResult("cms", 
"initialize").asserts().success();
-            checkPlacements(cluster.get(1));
-            checkPlacements(cluster.get(2));
+            checkPlacements(cluster.get(1), false);
+            checkPlacements(cluster.get(2), false);
         }).run();
     }
-
-    private void checkPlacements(IUpgradeableInstance i)
+    static void checkPlacements(IUpgradeableInstance i, boolean shouldExist)
+    {
+        checkPlacements(i, "127.0.0.3", shouldExist);
+    }
+    static void checkPlacements(IUpgradeableInstance i, String host, boolean 
shouldExist)
     {
         ((IInvokableInstance) i).runOnInstance(() -> {
             ClusterMetadata metadata = ClusterMetadata.current();
-            InetAddressAndPort ep = 
InetAddressAndPort.getByNameUnchecked("127.0.0.3");
+            InetAddressAndPort ep = 
InetAddressAndPort.getByNameUnchecked(host);
             metadata.placements.asMap().forEach((key, value) -> {
-                if (Streams.concat(value.reads.endpoints.stream(),
-                                   value.writes.endpoints.stream())
-                           .anyMatch(fr -> fr.endpoints().contains(ep)))
-                    throw new IllegalStateException(ep + " should not be in 
placements " + metadata.placements);
+                if (key.isMeta())
+                    return;
+                boolean existsInPlacements = 
Streams.concat(value.reads.endpoints.stream(),
+                                                            
value.writes.endpoints.stream())
+                                                    .anyMatch(fr -> 
fr.endpoints().contains(ep));
+                if (shouldExist != existsInPlacements)
+                    throw new IllegalStateException(ep + " should" + 
(shouldExist ? "" : " not")+ " be in placements " + key + " : " + value);
+
             });
         });
     }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeHibernateTest.java
 
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeHibernateTest.java
new file mode 100644
index 0000000000..35c108193a
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeHibernateTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import java.util.Map;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.shared.Versions;
+import org.apache.cassandra.gms.ApplicationState;
+
+import static 
org.apache.cassandra.distributed.action.GossipHelper.withProperty;
+import static 
org.apache.cassandra.distributed.upgrade.ClusterMetadataUpgradeAssassinateTest.checkPlacements;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ClusterMetadataUpgradeHibernateTest extends UpgradeTestBase
+{
+    @Test
+    public void hibernateUpgradeTest() throws Throwable
+    {
+        new TestCase()
+        .nodes(3)
+        .nodesToUpgrade(1, 2) // not node3 - we manually upgrade that below
+        .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP))
+        .upgradesToCurrentFrom(v50)
+        .setup((cluster) -> {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
+            cluster.get(3).shutdown().get();
+            // stopping a fully joined node and then starting it with 
join_ring=false puts it in hibernate status - it still owns tokens:
+            withProperty(CassandraRelevantProperties.JOIN_RING, false, () -> 
cluster.get(3).startup());
+            assertTrue(hibernating(cluster.get(1), "127.0.0.3"));
+        })
+        .runAfterClusterUpgrade((cluster) -> {
+            // manually upgrade node3 to be able to keep join_ring=false
+            cluster.get(3).shutdown().get();
+            cluster.get(3).setVersion(Versions.find().getLatest(v51));
+            assertTrue(hibernating(cluster.get(1), "127.0.0.3"));
+            withProperty(CassandraRelevantProperties.JOIN_RING, false, () -> 
cluster.get(3).startup());
+            cluster.forEach(i -> checkPlacements(i, true));
+            assertTrue(hibernating(cluster.get(1), "127.0.0.3"));
+            cluster.forEach(i -> checkPlacements(i, true));
+            cluster.get(1).nodetoolResult("cms", 
"initialize").asserts().success();
+            assertTrue(hibernating(cluster.get(1), "127.0.0.3"));
+            cluster.forEach(i -> checkPlacements(i, true));
+
+            // and remove join_ring=false and make sure it is no longer 
hibernating
+            cluster.get(3).shutdown().get();
+            cluster.get(3).startup();
+            assertFalse(hibernating(cluster.get(1), "127.0.0.3"));
+            cluster.forEach(i -> checkPlacements(i, true));
+        }).run();
+    }
+
+    @Test
+    public void hibernateBadGossipUpgradeTest() throws Throwable
+    {
+        new TestCase()
+        .nodes(3)
+        .nodesToUpgrade(1, 2)
+        .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP))
+        .upgradesToCurrentFrom(v50)
+        .setup((cluster) -> {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
+            cluster.get(3).shutdown().get();
+            withProperty(CassandraRelevantProperties.JOIN_RING, false, () -> 
cluster.get(3).startup());
+            cluster.get(3).shutdown();
+            assertTrue(hibernating(cluster.get(1), "127.0.0.3"));
+            assertTrue(hibernating(cluster.get(2), "127.0.0.3"));
+            // terrible - we might have old hibernating nodes in gossip which 
don't exist in peers_v2 - this
+            // is an approximation of that state to be able to upgrade and 
ignore these nodes
+            for (int i = 1; i <= 2; i++)
+                cluster.get(i).executeInternal("delete from system.peers_v2 
where peer = '127.0.0.3'");
+        })
+        .runAfterClusterUpgrade((cluster) -> {
+            checkPlacements(cluster.get(1), false);
+            checkPlacements(cluster.get(2), false);
+            // 127.0.0.3 should have been ignored on upgrade:
+            assertFalse(hibernating(cluster.get(1), "127.0.0.3"));
+            cluster.get(1).nodetoolResult("cms", 
"initialize").asserts().success();
+            assertFalse(hibernating(cluster.get(1), "127.0.0.3"));
+            cluster.get(2).shutdown().get();
+            cluster.get(2).startup();
+            assertFalse(hibernating(cluster.get(2), "127.0.0.3"));
+            checkPlacements(cluster.get(1), false);
+            checkPlacements(cluster.get(2), false);
+        }).run();
+    }
+
+    private static boolean hibernating(IInstance instance, String host)
+    {
+        Map<String, Map<String, String>> states = 
ClusterUtils.gossipInfo(instance);
+        Map<String, String> state = states.get('/'+host);
+        if (state == null)
+            return false;
+        String status = state.get(ApplicationState.STATUS_WITH_PORT.name());
+        return status != null && status.contains("hibernate");
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeJoinRingTest.java
 
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeJoinRingTest.java
new file mode 100644
index 0000000000..d9c2ef8b4c
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeJoinRingTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.distributed.shared.Uninterruptibles;
+import org.apache.cassandra.distributed.shared.Versions;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+
+import static 
org.apache.cassandra.distributed.action.GossipHelper.withProperty;
+import static 
org.apache.cassandra.distributed.upgrade.ClusterMetadataUpgradeAssassinateTest.checkPlacements;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ClusterMetadataUpgradeJoinRingTest extends UpgradeTestBase
+{
+    @Test
+    public void joinRingUpgradeTest() throws Throwable
+    {
+        TokenSupplier ts = TokenSupplier.evenlyDistributedTokens(4);
+
+        new TestCase()
+        .nodes(3)
+        .nodesToUpgrade(1, 2, 3)
+        .withTokenSupplier(ts::tokens)
+        .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(4, "dc0", 
"rack0"))
+        .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP))
+        .upgradesToCurrentFrom(v50)
+        .setup((cluster) -> {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
+            IInstanceConfig nodeConfig = cluster.newInstanceConfig();
+            IUpgradeableInstance newInstance = cluster.bootstrap(nodeConfig);
+            withProperty(CassandraRelevantProperties.JOIN_RING, false, 
newInstance::startup);
+            checkGossipinfo(cluster, false);
+        })
+        .runAfterClusterUpgrade((cluster) -> {
+            checkGossipinfo(cluster, false);
+            // node4 not upgraded yet - should be allowed to vote despite 
being join_ring=false:
+            cluster.get(1).nodetoolResult("cms", 
"initialize").asserts().failure();
+            cluster.get(4).shutdown().get();
+            cluster.get(4).setVersion(Versions.find().getLatest(v51));
+            withProperty(CassandraRelevantProperties.JOIN_RING, false, () -> 
cluster.get(4).startup());
+            checkGossipinfo(cluster, false);
+            checkPlacements(cluster.get(1), "127.0.0.4", false);
+
+            // before "cms initialize" - shouldn't be allowed to join
+            cluster.get(4).nodetoolResult("join").asserts().failure();
+            checkGossipinfo(cluster, false);
+            // don't allow non-joined nodes to become initial cms:
+            cluster.get(4).nodetoolResult("cms", 
"initialize").asserts().failure();
+
+            cluster.get(1).nodetoolResult("cms", 
"initialize").asserts().success();
+            checkGossipinfo(cluster, false);
+            ((IInvokableInstance)cluster.get(4)).runOnInstance(() -> {
+                while (ClusterMetadataService.state() == 
ClusterMetadataService.State.GOSSIP)
+                    Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+            });
+            cluster.get(4).nodetoolResult("join").asserts().success();
+            checkGossipinfo(cluster, true);
+            checkPlacements(cluster.get(1), "127.0.0.4", true);
+
+        }).run();
+    }
+
+    private void checkGossipinfo(UpgradeableCluster cluster, boolean 
shouldBeJoined)
+    {
+        Map<String, Map<String, String>> states = 
ClusterUtils.gossipInfo(cluster.get(1));
+        Map<String, String> node4State = states.get("/127.0.0.4");
+        assertTrue(node4State != null &&  !node4State.isEmpty());
+        assertEquals(!shouldBeJoined, node4State.get("TOKENS").contains("not 
present"));
+        assertEquals(shouldBeJoined, 
node4State.containsKey("STATUS_WITH_PORT"));
+    }
+
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
 
b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
index 44d8d35b40..6e6aaaef76 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Objects;
 import java.util.Set;
@@ -44,7 +45,9 @@ import org.apache.cassandra.distributed.UpgradeableCluster;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.TokenSupplier;
 import org.apache.cassandra.distributed.shared.DistributedTestBase;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
 import org.apache.cassandra.distributed.shared.ThrowingRunnable;
 import org.apache.cassandra.distributed.shared.Versions;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -159,6 +162,8 @@ public class UpgradeTestBase extends DistributedTestBase
         private final Set<Integer> nodesToUpgrade = new LinkedHashSet<>();
         private Consumer<IInstanceConfig> configConsumer;
         private Consumer<UpgradeableCluster.Builder> builderConsumer;
+        private TokenSupplier tokenSupplier;
+        private Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology;
         private UpgradeListener upgradeListener = new UpgradeListener()
         {
             @Override
@@ -334,6 +339,18 @@ public class UpgradeTestBase extends DistributedTestBase
             return this;
         }
 
+        public TestCase withTokenSupplier(TokenSupplier tokenSupplier)
+        {
+            this.tokenSupplier = tokenSupplier;
+            return this;
+        }
+
+        public TestCase withNodeIdTopology(Map<Integer, 
NetworkTopology.DcAndRack> nodeIdTopology)
+        {
+            this.nodeIdTopology = nodeIdTopology;
+            return this;
+        }
+
         public void run() throws Throwable
         {
             if (setup == null)
@@ -358,7 +375,7 @@ public class UpgradeTestBase extends DistributedTestBase
             for (TestVersions upgrade : this.upgrade)
             {
                 logger.info("testing upgrade from {} to {}", 
upgrade.initial.version, upgrade.upgradeVersions);
-                try (UpgradeableCluster cluster = 
init(UpgradeableCluster.create(nodeCount, upgrade.initial, configConsumer, 
builderConsumer)))
+                try (UpgradeableCluster cluster = 
init(UpgradeableCluster.create(nodeCount, upgrade.initial, configConsumer, 
builderConsumer, tokenSupplier, nodeIdTopology)))
                 {
                     setup.run(cluster);
 
diff --git 
a/test/unit/org/apache/cassandra/tcm/compatibility/GossipHelperTest.java 
b/test/unit/org/apache/cassandra/tcm/compatibility/GossipHelperTest.java
index 3ff7c03e83..a7ec4ad780 100644
--- a/test/unit/org/apache/cassandra/tcm/compatibility/GossipHelperTest.java
+++ b/test/unit/org/apache/cassandra/tcm/compatibility/GossipHelperTest.java
@@ -54,6 +54,7 @@ import static org.apache.cassandra.gms.VersionedValue.*;
 import static org.apache.cassandra.locator.InetAddressAndPort.getByName;
 import static 
org.apache.cassandra.tcm.compatibility.GossipHelper.fromEndpointStates;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -146,6 +147,33 @@ public class GossipHelperTest
         verifyPlacements(endpoints, metadata);
     }
 
+    @Test
+    public void duplicateHostIdTest() throws UnknownHostException
+    {
+        int nodes = 10;
+        Keyspaces kss = Keyspaces.NONE.with(KSM_NTS);
+        DistributedSchema schema = new DistributedSchema(kss);
+
+        Map<InetAddressAndPort, EndpointState> epstates = new HashMap<>();
+        UUID toDupe = UUID.randomUUID();
+        for (int i = 1; i < nodes; i++)
+        {
+            long t = i * 1000L;
+            InetAddressAndPort endpoint = getByName("127.0.0."+i);
+            Token token = t(t);
+            UUID hostId = i == 1 ? toDupe : UUID.randomUUID();
+            EndpointState endpointState = epstate(100, endpoint, endpoint, 
token, hostId, i % 2 == 1 ? "dc1" : "dc2");
+            epstates.put(endpoint, endpointState);
+        }
+
+        InetAddressAndPort oldDuplicate = getByName("127.0.0." + nodes);
+        epstates.put(oldDuplicate, epstate(50, oldDuplicate, oldDuplicate, 
t(nodes), toDupe, "dc2"));
+
+        ClusterMetadata metadata = fromEndpointStates(epstates, 
Murmur3Partitioner.instance, schema);
+        assertEquals(epstates.size() - 1, metadata.directory.addresses.size());
+        assertNull(metadata.directory.peerId(oldDuplicate));
+    }
+
     private static void verifyPlacements(Map<Integer, Token> endpoints, 
ClusterMetadata metadata) throws UnknownHostException
     {
         // quick check to make sure cm.placements is populated
@@ -185,8 +213,11 @@ public class GossipHelperTest
             assertTrue("endpoint "+ep+" should be in " + eps, 
eps.contains(ep));
         }
     }
-
     private static EndpointState epstate(InetAddressAndPort internalAddress, 
InetAddressAndPort nativeAddress, Token token, UUID hostId, String dc)
+    {
+        return epstate(1, internalAddress, nativeAddress, token, hostId, dc);
+    }
+    private static EndpointState epstate(int generation, InetAddressAndPort 
internalAddress, InetAddressAndPort nativeAddress, Token token, UUID hostId, 
String dc)
     {
         Map<ApplicationState, VersionedValue> versionedValues = new 
EnumMap<>(ApplicationState.class);
         versionedValues.put(STATUS_WITH_PORT, 
vvf.normal(Collections.singleton(token)));
@@ -197,7 +228,7 @@ public class GossipHelperTest
         versionedValues.put(RELEASE_VERSION, vvf.releaseVersion("3.0.24"));
         versionedValues.put(NATIVE_ADDRESS_AND_PORT, 
vvf.nativeaddressAndPort(nativeAddress));
         versionedValues.put(INTERNAL_ADDRESS_AND_PORT, 
vvf.internalAddressAndPort(internalAddress));
-        return new EndpointState(new HeartBeatState(1, 1), versionedValues);
+        return new EndpointState(new HeartBeatState(generation, 1), 
versionedValues);
     }
 
     private EndpointState withState(String status) throws UnknownHostException


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to