This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 4318e74180d710844b075fedc33e8ca008f87f63 Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Mon Mar 24 15:15:53 2025 +0100 Various gossip to TCM upgrade fixes patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-20483 --- CHANGES.txt | 1 + .../org/apache/cassandra/gms/FailureDetector.java | 29 +++-- src/java/org/apache/cassandra/gms/Gossiper.java | 42 +++++++- .../apache/cassandra/service/StorageService.java | 3 +- .../cassandra/tcm/ClusterMetadataService.java | 7 ++ src/java/org/apache/cassandra/tcm/Startup.java | 25 +++-- .../cassandra/tcm/compatibility/GossipHelper.java | 72 ++++++++++++- .../apache/cassandra/tcm/membership/Directory.java | 2 +- .../cassandra/tcm/membership/NodeVersion.java | 2 +- .../apache/cassandra/tcm/migration/Election.java | 13 ++- .../cassandra/tcm/migration/GossipCMSListener.java | 3 + .../tcm/sequences/BootstrapAndReplace.java | 12 +++ .../tcm/sequences/ReplaceSameAddress.java | 1 + .../apache/cassandra/utils/CassandraVersion.java | 1 + .../cassandra/distributed/UpgradeableCluster.java | 15 +++ .../ClusterMetadataUpgradeAssassinateTest.java | 29 +++-- .../ClusterMetadataUpgradeHibernateTest.java | 119 +++++++++++++++++++++ .../ClusterMetadataUpgradeJoinRingTest.java | 102 ++++++++++++++++++ .../distributed/upgrade/UpgradeTestBase.java | 19 +++- .../tcm/compatibility/GossipHelperTest.java | 35 +++++- 20 files changed, 488 insertions(+), 44 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 864e664620..0449ba45d5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Various gossip to TCM upgrade fixes (CASSANDRA-20483) * Add nodetool command to abort failed nodetool cms initialize (CASSANDRA-20482) * Repair Paxos for the distributed metadata log when CMS membership changes (CASSANDRA-20467) * Reintroduce CASSANDRA-17411 in trunk (CASSANDRA-19346) diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java index 34c98da4b7..49b2089297 100644 --- a/src/java/org/apache/cassandra/gms/FailureDetector.java +++ b/src/java/org/apache/cassandra/gms/FailureDetector.java @@ -47,12 +47,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.MultiStepOperation; import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.sequences.BootstrapAndReplace; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MBeanWrapper; @@ -150,7 +151,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean for (Map.Entry<InetAddressAndPort, EndpointState> entry : Gossiper.instance.endpointStateMap.entrySet()) { sb.append(resolveIp ? entry.getKey().getHostName(withPort) : entry.getKey().toString(withPort)).append("\n"); - appendEndpointState(sb, entry.getValue()); + appendEndpointState(sb, entry.getKey(), entry.getValue()); } return sb.toString(); } @@ -242,12 +243,13 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean public String getEndpointState(String address) throws UnknownHostException { StringBuilder sb = new StringBuilder(); - EndpointState endpointState = Gossiper.instance.getEndpointStateForEndpoint(InetAddressAndPort.getByName(address)); - appendEndpointState(sb, endpointState); + InetAddressAndPort endpoint = InetAddressAndPort.getByName(address); + EndpointState endpointState = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + appendEndpointState(sb, endpoint, endpointState); return sb.toString(); } - private void appendEndpointState(StringBuilder sb, EndpointState endpointState) + private void appendEndpointState(StringBuilder sb, InetAddressAndPort endpoint, EndpointState endpointState) { sb.append(" generation:").append(endpointState.getHeartBeatState().getGeneration()).append("\n"); sb.append(" heartbeat:").append(endpointState.getHeartBeatState().getHeartBeatVersion()).append("\n"); @@ -258,9 +260,20 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean sb.append(" ").append(state.getKey()).append(":").append(state.getValue().version).append(":").append(state.getValue().value).append("\n"); } ClusterMetadata metadata = ClusterMetadata.current(); - NodeId nodeId = metadata.directory.peerId(FBUtilities.getBroadcastAddressAndPort()); - List<Token> tokens = metadata.tokenMap.tokens(nodeId); - if (tokens != null && !tokens.isEmpty()) + NodeId nodeId = metadata.directory.peerId(endpoint); + boolean foundTokens = false; + if (nodeId != null) + { + foundTokens = !metadata.tokenMap.tokens(nodeId).isEmpty(); + if (!foundTokens) + { + MultiStepOperation<?> mso = metadata.inProgressSequences.get(nodeId); + if (mso instanceof BootstrapAndReplace) + foundTokens = true; + } + } + + if (foundTokens) sb.append(" TOKENS:").append(metadata.epoch.getEpoch()).append(":<hidden>\n"); else sb.append(" TOKENS: not present\n"); diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 5274a57ffb..14cc5f5ada 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -454,6 +454,26 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean, return state.equals(VersionedValue.SHUTDOWN); } + public static boolean isHibernate(EndpointState epState) + { + VersionedValue versionedValue = epState.getApplicationState(ApplicationState.STATUS_WITH_PORT); + if (versionedValue == null) + versionedValue = epState.getApplicationState(ApplicationState.STATUS); + return isHibernate(versionedValue); + } + + public static boolean isHibernate(VersionedValue vv) + { + if (vv == null) + return false; + + String value = vv.value; + String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1); + assert (pieces.length > 0); + String state = pieces[0]; + return state.equals(VersionedValue.HIBERNATE); + } + public static void runInGossipStageBlocking(Runnable runnable) { // run immediately if we're already in the gossip stage @@ -2106,10 +2126,18 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean, */ public void mergeNodeToGossip(NodeId nodeId, ClusterMetadata metadata) { - mergeNodeToGossip(nodeId, metadata, metadata.tokenMap.tokens(nodeId)); + mergeNodeToGossip(nodeId, metadata, metadata.tokenMap.tokens(nodeId), false); + } + public void mergeNodeToGossip(NodeId nodeId, ClusterMetadata metadata, boolean forceHibernate) + { + mergeNodeToGossip(nodeId, metadata, metadata.tokenMap.tokens(nodeId), forceHibernate); } - public void mergeNodeToGossip(NodeId nodeId, ClusterMetadata metadata, Collection<Token> tokens) + { + mergeNodeToGossip(nodeId, metadata, tokens, false); + } + + private void mergeNodeToGossip(NodeId nodeId, ClusterMetadata metadata, Collection<Token> tokens, boolean forceHibernate) { taskLock.lock(); try @@ -2156,7 +2184,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean, newValue = valueFactory.hostId(uuid); break; case TOKENS: - if (tokens != null) + if (tokens != null && !tokens.isEmpty()) newValue = valueFactory.tokens(tokens); break; case INTERNAL_ADDRESS_AND_PORT: @@ -2175,6 +2203,14 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean, // In this case, the app state will be set to `hibernate` by StorageService, so // don't set it here as nodeStateToStatus only considers persistent states (e.g. // ones stored in ClusterMetadata), it isn't aware of transient states like hibernate. + // forceHibernate can be true when upgrading from pre-tcm versions - if a node is hibernating + // we have no state for this in cluster metadata, so we need to explicitly keep that from + // the pre-upgrade gossip states + if (forceHibernate) + { + newValue = valueFactory.hibernate(true); + break; + } if (isLocal && !StorageService.instance.shouldJoinRing()) break; newValue = GossipHelper.nodeStateToStatus(nodeId, metadata, tokens, valueFactory, oldValue); diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 53f815a586..812dc31cd4 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -2084,8 +2084,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // normal STATUS. if (state == ApplicationState.STATUS_WITH_PORT) { - String[] pieces = splitValue(value); - if (pieces[0].equals(VersionedValue.HIBERNATE)) + if (Gossiper.isHibernate(value)) { logger.info("Node {} state jump to hibernate", endpoint); Gossiper.runInGossipStageBlocking(() -> { diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java index 4c6eed11b4..3e3d8389ae 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java @@ -318,6 +318,13 @@ public class ClusterMetadataService } ClusterMetadata metadata = metadata(); + if (metadata.myNodeState() != NodeState.JOINED) + { + String msg = String.format("Initial CMS node needs to be fully joined, not: %s", metadata.myNodeState()); + logger.error(msg); + throw new IllegalStateException(msg); + } + Set<InetAddressAndPort> existingMembers = metadata.fullCMSMembers(); if (!metadata.directory.allAddresses().containsAll(ignored)) diff --git a/src/java/org/apache/cassandra/tcm/Startup.java b/src/java/org/apache/cassandra/tcm/Startup.java index edd8734fca..d17b687698 100644 --- a/src/java/org/apache/cassandra/tcm/Startup.java +++ b/src/java/org/apache/cassandra/tcm/Startup.java @@ -74,6 +74,7 @@ import static org.apache.cassandra.tcm.ClusterMetadataService.State.LOCAL; import static org.apache.cassandra.tcm.compatibility.GossipHelper.emptyWithSchemaFromSystemTables; import static org.apache.cassandra.tcm.compatibility.GossipHelper.fromEndpointStates; import static org.apache.cassandra.tcm.membership.NodeState.JOINED; +import static org.apache.cassandra.tcm.membership.NodeState.LEFT; import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; /** @@ -313,18 +314,26 @@ import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; logger.debug("Created initial ClusterMetadata {}", initial); ClusterMetadataService.instance().setFromGossip(initial); Gossiper.instance.clearUnsafe(); - if (switchIp != null) - { - // quarantine the old ip to make sure it doesn't get re-added via gossip - InetAddressAndPort removeEp = switchIp; - Gossiper.runInGossipStageBlocking(() -> Gossiper.instance.removeEndpoint(removeEp)); - } + + // find any endpoints that were ignored on upgrade and make sure they don't get re-added in gossip + InetAddressAndPort removeEp = switchIp; + Gossiper.runInGossipStageBlocking(() -> { + if (removeEp != null) + Gossiper.instance.removeEndpoint(removeEp); + for (InetAddressAndPort ep : epStates.keySet()) + { + if (initial.directory.peerId(ep) == null) + Gossiper.instance.removeEndpoint(ep); // just quarantines the ep - endpoint states should be empty + // here (this is run before Gossiper is started) + } + }); + Gossiper.instance.maybeInitializeLocalState(SystemKeyspace.incrementAndGetGeneration()); for (Map.Entry<NodeId, NodeState> entry : initial.directory.states.entrySet()) { InetAddressAndPort ep = initial.directory.addresses.get(entry.getKey()).broadcastAddress; - if (entry.getValue() != NodeState.LEFT) - Gossiper.instance.mergeNodeToGossip(entry.getKey(), initial); + if (entry.getValue() != LEFT) + Gossiper.instance.mergeNodeToGossip(entry.getKey(), initial, Gossiper.isHibernate(epStates.get(ep))); else Gossiper.runInGossipStageBlocking(() -> Gossiper.instance.endpointStateMap.put(ep, epStates.get(ep))); } diff --git a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java index 0e1acbc4ea..1a555fce4d 100644 --- a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java +++ b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java @@ -87,6 +87,7 @@ import static org.apache.cassandra.gms.ApplicationState.RELEASE_VERSION; import static org.apache.cassandra.gms.ApplicationState.RPC_ADDRESS; import static org.apache.cassandra.gms.ApplicationState.STATUS_WITH_PORT; import static org.apache.cassandra.gms.ApplicationState.TOKENS; +import static org.apache.cassandra.gms.Gossiper.isHibernate; import static org.apache.cassandra.gms.Gossiper.isShutdown; import static org.apache.cassandra.locator.InetAddressAndPort.getByName; import static org.apache.cassandra.locator.InetAddressAndPort.getByNameOverrideDefaults; @@ -123,6 +124,8 @@ public class GossipHelper case JOINED: if (isShutdown(oldValue)) status = valueFactory.shutdown(true); + else if (isHibernate(oldValue)) + status = valueFactory.hibernate(true); else status = valueFactory.normal(tokens); break; @@ -224,10 +227,13 @@ public class GossipHelper String status = epState.getStatus(); if (status.equals(VersionedValue.STATUS_NORMAL) || - status.equals(VersionedValue.SHUTDOWN)) + status.equals(VersionedValue.SHUTDOWN) || + status.equals(VersionedValue.HIBERNATE)) return NodeState.JOINED; if (status.equals(VersionedValue.STATUS_LEFT)) return NodeState.LEFT; + if (status.isEmpty()) + return NodeState.REGISTERED; throw new IllegalStateException("Can't upgrade the first node when STATUS = " + status + " for node " + endpoint); } @@ -332,6 +338,12 @@ public class GossipHelper { Directory directory = new Directory().withLastModified(Epoch.UPGRADE_GOSSIP); TokenMap tokenMap = new TokenMap(partitioner).withLastModified(Epoch.UPGRADE_GOSSIP); + + // gossip can contain old hosts with duplicate host ids during upgrades from pre-TCM versions. We need to clean + // those up during upgrades since TCM is more strict. Here we simply keep the host with the newest gossip + // generation if there is a duplicate hostid. + if (containsDuplicateHostIds(epStates)) + epStates = cleanupDuplicateHostIds(epStates); List<InetAddressAndPort> sortedEps = Lists.newArrayList(epStates.keySet()); Collections.sort(sortedEps); Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions = new HashMap<>(); @@ -344,14 +356,20 @@ public class GossipHelper NodeAddresses nodeAddresses = getAddressesFromEndpointState(endpoint, epState); NodeVersion nodeVersion = getVersionFromEndpointState(endpoint, epState); assert hostIdString != null; - NodeState nodeState = toNodeState(endpoint, epState); + // some clusters have old, removed hibernating endpoints in gossip, ignore these if they don't exist in our system.peers_v2 table + if (!endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) && Gossiper.isHibernate(epState) && !SystemKeyspace.loadTokens().containsKey(endpoint)) + { + logger.info("Ignoring endpoint {} with endpoint states {} since it is missing from system.peers_v2", endpoint, epState); + continue; + } + NodeState nodeState = toNodeState(endpoint, epState); directory = directory.withNonUpgradedNode(nodeAddresses, new Location(dc, rack), nodeVersion, nodeState, UUID.fromString(hostIdString)); - if (nodeState != NodeState.LEFT) + if (nodeState == NodeState.JOINED) { NodeId nodeId = directory.peerId(endpoint); tokenMap = tokenMap.assignTokens(nodeId, getTokensIn(partitioner, epState)); @@ -385,7 +403,7 @@ public class GossipHelper { if (epstates.isEmpty()) return false; - EnumSet<ApplicationState> requiredStates = EnumSet.of(DC, RACK, HOST_ID, TOKENS, RELEASE_VERSION); + EnumSet<ApplicationState> requiredStates = EnumSet.of(DC, RACK, HOST_ID, RELEASE_VERSION); for (Map.Entry<InetAddressAndPort, EndpointState> entry : epstates.entrySet()) { EndpointState epstate = entry.getValue(); @@ -398,4 +416,50 @@ public class GossipHelper } return true; } + + private static boolean containsDuplicateHostIds(Map<InetAddressAndPort, EndpointState> epstates) + { + Set<String> hostIds = new HashSet<>(); + for (EndpointState epstate : epstates.values()) + { + String hostIdString = epstate.getApplicationState(HOST_ID).value; + if (hostIds.contains(hostIdString)) + return true; + hostIds.add(hostIdString); + } + return false; + } + + private static Map<InetAddressAndPort, EndpointState> cleanupDuplicateHostIds(Map<InetAddressAndPort, EndpointState> epstates) + { + Map<InetAddressAndPort, EndpointState> cleanEpstates = new HashMap<>(); + Map<String, InetAddressAndPort> seenHostIds = new HashMap<>(); + for (Map.Entry<InetAddressAndPort, EndpointState> entry : epstates.entrySet()) + { + InetAddressAndPort endpoint = entry.getKey(); + EndpointState epstate = entry.getValue(); + String hostIdString = epstate.getApplicationState(HOST_ID).value; + if (seenHostIds.containsKey(hostIdString)) + { + int thisGeneration = epstate.getHeartBeatState().getGeneration(); + + InetAddressAndPort seenHost = seenHostIds.get(hostIdString); + int seenGeneration = epstates.get(seenHost).getHeartBeatState().getGeneration(); + logger.warn("Duplicate host id {} found: {} with generation {} and {} with generation {}, keeping the one with the newest generation", + hostIdString, seenHost, seenGeneration, endpoint, thisGeneration); + if (thisGeneration > seenGeneration) + { + cleanEpstates.remove(seenHost); + cleanEpstates.put(endpoint, epstate); + seenHostIds.put(hostIdString, endpoint); + } + } + else + { + seenHostIds.put(hostIdString, endpoint); + cleanEpstates.put(endpoint, epstate); + } + } + return cleanEpstates; + } } diff --git a/src/java/org/apache/cassandra/tcm/membership/Directory.java b/src/java/org/apache/cassandra/tcm/membership/Directory.java index 90af4ff79f..51ab84c520 100644 --- a/src/java/org/apache/cassandra/tcm/membership/Directory.java +++ b/src/java/org/apache/cassandra/tcm/membership/Directory.java @@ -157,7 +157,7 @@ public class Directory implements MetadataValue<Directory> { NodeId id = new NodeId(nextId); Directory updated = with(addresses, id, hostId, location, version).withNodeState(id, state); - if (state != NodeState.LEFT) + if (state == NodeState.JOINED) updated = updated.withRackAndDC(id); return updated; } diff --git a/src/java/org/apache/cassandra/tcm/membership/NodeVersion.java b/src/java/org/apache/cassandra/tcm/membership/NodeVersion.java index e5b608994a..bc1bcc707e 100644 --- a/src/java/org/apache/cassandra/tcm/membership/NodeVersion.java +++ b/src/java/org/apache/cassandra/tcm/membership/NodeVersion.java @@ -36,7 +36,7 @@ public class NodeVersion implements Comparable<NodeVersion> public static final Serializer serializer = new Serializer(); public static final Version CURRENT_METADATA_VERSION = Version.V6; public static final NodeVersion CURRENT = new NodeVersion(new CassandraVersion(FBUtilities.getReleaseVersionString()), CURRENT_METADATA_VERSION); - private static final CassandraVersion SINCE_VERSION = CassandraVersion.CASSANDRA_5_0; + private static final CassandraVersion SINCE_VERSION = CassandraVersion.CASSANDRA_5_1; public final CassandraVersion cassandraVersion; public final int serializationVersion; diff --git a/src/java/org/apache/cassandra/tcm/migration/Election.java b/src/java/org/apache/cassandra/tcm/migration/Election.java index 69fac6f4ba..94f5dc4a06 100644 --- a/src/java/org/apache/cassandra/tcm/migration/Election.java +++ b/src/java/org/apache/cassandra/tcm/migration/Election.java @@ -21,6 +21,7 @@ package org.apache.cassandra.tcm.migration; import java.io.IOException; import java.util.Collection; import java.util.HashSet; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.UUID; @@ -38,6 +39,8 @@ import org.apache.cassandra.schema.SchemaKeyspace; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.Startup; import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.membership.NodeState; import org.apache.cassandra.tcm.ownership.TokenMap; import org.apache.cassandra.tcm.transformations.Register; import org.apache.cassandra.net.MessageDelivery; @@ -50,6 +53,8 @@ import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; +import static org.apache.cassandra.tcm.membership.NodeState.LEFT; + /** * Election process establishes initial CMS leader, from which you can further evolve cluster metadata. */ @@ -180,10 +185,12 @@ public class Election CMSInitializationRequest.Initiator currentInitiator = initiator.get(); if (currentInitiator != null && Objects.equals(currentInitiator.initiator, expectedInitiator) && initiator.compareAndSet(currentInitiator, null)) { - for (InetAddressAndPort ep : ClusterMetadata.current().directory.allJoinedEndpoints()) + ClusterMetadata metadata = ClusterMetadata.current(); + for (Map.Entry<NodeId, NodeState> entry : metadata.directory.states.entrySet()) { - if (!ep.equals(FBUtilities.getBroadcastAddressAndPort())) - messaging.send(Message.out(Verb.TCM_ABORT_MIG, currentInitiator), ep); + NodeId nodeId = entry.getKey(); + if (!Objects.equals(metadata.myNodeId(), nodeId) && entry.getValue() != LEFT) + messaging.send(Message.out(Verb.TCM_ABORT_MIG, currentInitiator), metadata.directory.endpoint(nodeId)); } } else diff --git a/src/java/org/apache/cassandra/tcm/migration/GossipCMSListener.java b/src/java/org/apache/cassandra/tcm/migration/GossipCMSListener.java index b712a695a2..97ff11b0e7 100644 --- a/src/java/org/apache/cassandra/tcm/migration/GossipCMSListener.java +++ b/src/java/org/apache/cassandra/tcm/migration/GossipCMSListener.java @@ -54,6 +54,9 @@ public class GossipCMSListener implements IEndpointStateChangeSubscriber if (nodeId == null) { VersionedValue hostIdValue = epState.getApplicationState(ApplicationState.HOST_ID); + if (Gossiper.isHibernate(epState)) + return; + if (hostIdValue != null) { UUID hostId = UUID.fromString(hostIdValue.value); diff --git a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java index e774b77ebe..2b283d6905 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java +++ b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java @@ -20,6 +20,7 @@ package org.apache.cassandra.tcm.sequences; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Objects; @@ -443,6 +444,17 @@ public class BootstrapAndReplace extends MultiStepOperation<Epoch> Gossiper.instance.addLocalApplicationStates(states); } + public static void gossipStateToNormal(ClusterMetadata metadata, NodeId nodeId) + { + List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<>(); + VersionedValue.VersionedValueFactory valueFactory = StorageService.instance.valueFactory; + Collection<Token> tokens = metadata.tokenMap.tokens(nodeId); + states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(tokens))); + states.add(Pair.create(ApplicationState.STATUS_WITH_PORT, valueFactory.normal(tokens))); + states.add(Pair.create(ApplicationState.STATUS, valueFactory.normal(tokens))); + Gossiper.instance.addLocalApplicationStates(states); + } + public static class Serializer implements AsymmetricMetadataSerializer<MultiStepOperation<?>, BootstrapAndReplace> { public void serialize(MultiStepOperation<?> t, DataOutputPlus out, Version version) throws IOException diff --git a/src/java/org/apache/cassandra/tcm/sequences/ReplaceSameAddress.java b/src/java/org/apache/cassandra/tcm/sequences/ReplaceSameAddress.java index 2c4553a14f..be75538d68 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/ReplaceSameAddress.java +++ b/src/java/org/apache/cassandra/tcm/sequences/ReplaceSameAddress.java @@ -93,6 +93,7 @@ public class ReplaceSameAddress StreamSupport.stream(ColumnFamilyStore.all().spliterator(), false) .filter(cfs -> Schema.instance.getUserKeyspaces().names().contains(cfs.keyspace.getName())) .forEach(cfs -> cfs.indexManager.executePreJoinTasksBlocking(true)); + BootstrapAndReplace.gossipStateToNormal(metadata, metadata.myNodeId()); Gossiper.instance.mergeNodeToGossip(metadata.myNodeId(), metadata); } } diff --git a/src/java/org/apache/cassandra/utils/CassandraVersion.java b/src/java/org/apache/cassandra/utils/CassandraVersion.java index 587e06d061..27891f560a 100644 --- a/src/java/org/apache/cassandra/utils/CassandraVersion.java +++ b/src/java/org/apache/cassandra/utils/CassandraVersion.java @@ -50,6 +50,7 @@ public class CassandraVersion implements Comparable<CassandraVersion> private static final Pattern PATTERN = Pattern.compile(VERSION_REGEXP); + public static final CassandraVersion CASSANDRA_5_1 = new CassandraVersion("5.1").familyLowerBound.get(); public static final CassandraVersion CASSANDRA_5_0 = new CassandraVersion("5.0").familyLowerBound.get(); public static final CassandraVersion CASSANDRA_4_1 = new CassandraVersion("4.1").familyLowerBound.get(); public static final CassandraVersion CASSANDRA_4_0 = new CassandraVersion("4.0").familyLowerBound.get(); diff --git a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java index b7bd1d6d66..68b542c848 100644 --- a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java +++ b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java @@ -19,11 +19,14 @@ package org.apache.cassandra.distributed; import java.io.IOException; +import java.util.Map; import java.util.function.Consumer; import org.apache.cassandra.distributed.api.IInstanceConfig; import org.apache.cassandra.distributed.api.IUpgradeableInstance; +import org.apache.cassandra.distributed.api.TokenSupplier; import org.apache.cassandra.distributed.impl.AbstractCluster; +import org.apache.cassandra.distributed.shared.NetworkTopology; import org.apache.cassandra.distributed.shared.Versions; /** @@ -73,6 +76,18 @@ public class UpgradeableCluster extends AbstractCluster<IUpgradeableInstance> im return builder.start(); } + public static UpgradeableCluster create(int nodeCount, Versions.Version version, Consumer<IInstanceConfig> configUpdater, Consumer<Builder> builderUpdater, TokenSupplier tokenSupplier, Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology) throws IOException + { + Builder builder = build(nodeCount).withConfig(configUpdater).withVersion(version); + if (tokenSupplier != null) + builder = builder.withTokenSupplier(tokenSupplier); + if (nodeIdTopology != null) + builder = builder.withNodeIdTopology(nodeIdTopology); + if (builderUpdater != null) + builderUpdater.accept(builder); + return builder.start(); + } + public static UpgradeableCluster create(int nodeCount, Versions.Version version) throws Throwable { return build(nodeCount).withVersion(version).start(); diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeAssassinateTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeAssassinateTest.java index 9e5be9639a..7712a040d9 100644 --- a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeAssassinateTest.java +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeAssassinateTest.java @@ -43,24 +43,31 @@ public class ClusterMetadataUpgradeAssassinateTest extends UpgradeTestBase cluster.get(1).nodetoolResult("assassinate", "127.0.0.3").asserts().success(); }) .runAfterClusterUpgrade((cluster) -> { - checkPlacements(cluster.get(1)); - checkPlacements(cluster.get(2)); + checkPlacements(cluster.get(1), false); + checkPlacements(cluster.get(2), false); cluster.get(1).nodetoolResult("cms", "initialize").asserts().success(); - checkPlacements(cluster.get(1)); - checkPlacements(cluster.get(2)); + checkPlacements(cluster.get(1), false); + checkPlacements(cluster.get(2), false); }).run(); } - - private void checkPlacements(IUpgradeableInstance i) + static void checkPlacements(IUpgradeableInstance i, boolean shouldExist) + { + checkPlacements(i, "127.0.0.3", shouldExist); + } + static void checkPlacements(IUpgradeableInstance i, String host, boolean shouldExist) { ((IInvokableInstance) i).runOnInstance(() -> { ClusterMetadata metadata = ClusterMetadata.current(); - InetAddressAndPort ep = InetAddressAndPort.getByNameUnchecked("127.0.0.3"); + InetAddressAndPort ep = InetAddressAndPort.getByNameUnchecked(host); metadata.placements.asMap().forEach((key, value) -> { - if (Streams.concat(value.reads.endpoints.stream(), - value.writes.endpoints.stream()) - .anyMatch(fr -> fr.endpoints().contains(ep))) - throw new IllegalStateException(ep + " should not be in placements " + metadata.placements); + if (key.isMeta()) + return; + boolean existsInPlacements = Streams.concat(value.reads.endpoints.stream(), + value.writes.endpoints.stream()) + .anyMatch(fr -> fr.endpoints().contains(ep)); + if (shouldExist != existsInPlacements) + throw new IllegalStateException(ep + " should" + (shouldExist ? "" : " not")+ " be in placements " + key + " : " + value); + }); }); } diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeHibernateTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeHibernateTest.java new file mode 100644 index 0000000000..35c108193a --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeHibernateTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.upgrade; + +import java.util.Map; + +import org.junit.Test; + +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.distributed.shared.Versions; +import org.apache.cassandra.gms.ApplicationState; + +import static org.apache.cassandra.distributed.action.GossipHelper.withProperty; +import static org.apache.cassandra.distributed.upgrade.ClusterMetadataUpgradeAssassinateTest.checkPlacements; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ClusterMetadataUpgradeHibernateTest extends UpgradeTestBase +{ + @Test + public void hibernateUpgradeTest() throws Throwable + { + new TestCase() + .nodes(3) + .nodesToUpgrade(1, 2) // not node3 - we manually upgrade that below + .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP)) + .upgradesToCurrentFrom(v50) + .setup((cluster) -> { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + cluster.get(3).shutdown().get(); + // stopping a fully joined node and then starting it with join_ring=false puts it in hibernate status - it still owns tokens: + withProperty(CassandraRelevantProperties.JOIN_RING, false, () -> cluster.get(3).startup()); + assertTrue(hibernating(cluster.get(1), "127.0.0.3")); + }) + .runAfterClusterUpgrade((cluster) -> { + // manually upgrade node3 to be able to keep join_ring=false + cluster.get(3).shutdown().get(); + cluster.get(3).setVersion(Versions.find().getLatest(v51)); + assertTrue(hibernating(cluster.get(1), "127.0.0.3")); + withProperty(CassandraRelevantProperties.JOIN_RING, false, () -> cluster.get(3).startup()); + cluster.forEach(i -> checkPlacements(i, true)); + assertTrue(hibernating(cluster.get(1), "127.0.0.3")); + cluster.forEach(i -> checkPlacements(i, true)); + cluster.get(1).nodetoolResult("cms", "initialize").asserts().success(); + assertTrue(hibernating(cluster.get(1), "127.0.0.3")); + cluster.forEach(i -> checkPlacements(i, true)); + + // and remove join_ring=false and make sure it is no longer hibernating + cluster.get(3).shutdown().get(); + cluster.get(3).startup(); + assertFalse(hibernating(cluster.get(1), "127.0.0.3")); + cluster.forEach(i -> checkPlacements(i, true)); + }).run(); + } + + @Test + public void hibernateBadGossipUpgradeTest() throws Throwable + { + new TestCase() + .nodes(3) + .nodesToUpgrade(1, 2) + .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP)) + .upgradesToCurrentFrom(v50) + .setup((cluster) -> { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + cluster.get(3).shutdown().get(); + withProperty(CassandraRelevantProperties.JOIN_RING, false, () -> cluster.get(3).startup()); + cluster.get(3).shutdown(); + assertTrue(hibernating(cluster.get(1), "127.0.0.3")); + assertTrue(hibernating(cluster.get(2), "127.0.0.3")); + // terrible - we might have old hibernating nodes in gossip which don't exist in peers_v2 - this + // is an approximation of that state to be able to upgrade and ignore these nodes + for (int i = 1; i <= 2; i++) + cluster.get(i).executeInternal("delete from system.peers_v2 where peer = '127.0.0.3'"); + }) + .runAfterClusterUpgrade((cluster) -> { + checkPlacements(cluster.get(1), false); + checkPlacements(cluster.get(2), false); + // 127.0.0.3 should have been ignored on upgrade: + assertFalse(hibernating(cluster.get(1), "127.0.0.3")); + cluster.get(1).nodetoolResult("cms", "initialize").asserts().success(); + assertFalse(hibernating(cluster.get(1), "127.0.0.3")); + cluster.get(2).shutdown().get(); + cluster.get(2).startup(); + assertFalse(hibernating(cluster.get(2), "127.0.0.3")); + checkPlacements(cluster.get(1), false); + checkPlacements(cluster.get(2), false); + }).run(); + } + + private static boolean hibernating(IInstance instance, String host) + { + Map<String, Map<String, String>> states = ClusterUtils.gossipInfo(instance); + Map<String, String> state = states.get('/'+host); + if (state == null) + return false; + String status = state.get(ApplicationState.STATUS_WITH_PORT.name()); + return status != null && status.contains("hibernate"); + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeJoinRingTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeJoinRingTest.java new file mode 100644 index 0000000000..d9c2ef8b4c --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeJoinRingTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.upgrade; + +import org.junit.Test; + +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.IUpgradeableInstance; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.distributed.shared.NetworkTopology; +import org.apache.cassandra.distributed.shared.Uninterruptibles; +import org.apache.cassandra.distributed.shared.Versions; +import org.apache.cassandra.tcm.ClusterMetadataService; + +import static org.apache.cassandra.distributed.action.GossipHelper.withProperty; +import static org.apache.cassandra.distributed.upgrade.ClusterMetadataUpgradeAssassinateTest.checkPlacements; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class ClusterMetadataUpgradeJoinRingTest extends UpgradeTestBase +{ + @Test + public void joinRingUpgradeTest() throws Throwable + { + TokenSupplier ts = TokenSupplier.evenlyDistributedTokens(4); + + new TestCase() + .nodes(3) + .nodesToUpgrade(1, 2, 3) + .withTokenSupplier(ts::tokens) + .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(4, "dc0", "rack0")) + .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP)) + .upgradesToCurrentFrom(v50) + .setup((cluster) -> { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + IInstanceConfig nodeConfig = cluster.newInstanceConfig(); + IUpgradeableInstance newInstance = cluster.bootstrap(nodeConfig); + withProperty(CassandraRelevantProperties.JOIN_RING, false, newInstance::startup); + checkGossipinfo(cluster, false); + }) + .runAfterClusterUpgrade((cluster) -> { + checkGossipinfo(cluster, false); + // node4 not upgraded yet - should be allowed to vote despite being join_ring=false: + cluster.get(1).nodetoolResult("cms", "initialize").asserts().failure(); + cluster.get(4).shutdown().get(); + cluster.get(4).setVersion(Versions.find().getLatest(v51)); + withProperty(CassandraRelevantProperties.JOIN_RING, false, () -> cluster.get(4).startup()); + checkGossipinfo(cluster, false); + checkPlacements(cluster.get(1), "127.0.0.4", false); + + // before "cms initialize" - shouldn't be allowed to join + cluster.get(4).nodetoolResult("join").asserts().failure(); + checkGossipinfo(cluster, false); + // don't allow non-joined nodes to become initial cms: + cluster.get(4).nodetoolResult("cms", "initialize").asserts().failure(); + + cluster.get(1).nodetoolResult("cms", "initialize").asserts().success(); + checkGossipinfo(cluster, false); + ((IInvokableInstance)cluster.get(4)).runOnInstance(() -> { + while (ClusterMetadataService.state() == ClusterMetadataService.State.GOSSIP) + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + }); + cluster.get(4).nodetoolResult("join").asserts().success(); + checkGossipinfo(cluster, true); + checkPlacements(cluster.get(1), "127.0.0.4", true); + + }).run(); + } + + private void checkGossipinfo(UpgradeableCluster cluster, boolean shouldBeJoined) + { + Map<String, Map<String, String>> states = ClusterUtils.gossipInfo(cluster.get(1)); + Map<String, String> node4State = states.get("/127.0.0.4"); + assertTrue(node4State != null && !node4State.isEmpty()); + assertEquals(!shouldBeJoined, node4State.get("TOKENS").contains("not present")); + assertEquals(shouldBeJoined, node4State.containsKey("STATUS_WITH_PORT")); + } + +} diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java index 44d8d35b40..6e6aaaef76 100644 --- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.NavigableSet; import java.util.Objects; import java.util.Set; @@ -44,7 +45,9 @@ import org.apache.cassandra.distributed.UpgradeableCluster; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.ICluster; import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.api.TokenSupplier; import org.apache.cassandra.distributed.shared.DistributedTestBase; +import org.apache.cassandra.distributed.shared.NetworkTopology; import org.apache.cassandra.distributed.shared.ThrowingRunnable; import org.apache.cassandra.distributed.shared.Versions; import org.apache.cassandra.utils.ByteBufferUtil; @@ -159,6 +162,8 @@ public class UpgradeTestBase extends DistributedTestBase private final Set<Integer> nodesToUpgrade = new LinkedHashSet<>(); private Consumer<IInstanceConfig> configConsumer; private Consumer<UpgradeableCluster.Builder> builderConsumer; + private TokenSupplier tokenSupplier; + private Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology; private UpgradeListener upgradeListener = new UpgradeListener() { @Override @@ -334,6 +339,18 @@ public class UpgradeTestBase extends DistributedTestBase return this; } + public TestCase withTokenSupplier(TokenSupplier tokenSupplier) + { + this.tokenSupplier = tokenSupplier; + return this; + } + + public TestCase withNodeIdTopology(Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology) + { + this.nodeIdTopology = nodeIdTopology; + return this; + } + public void run() throws Throwable { if (setup == null) @@ -358,7 +375,7 @@ public class UpgradeTestBase extends DistributedTestBase for (TestVersions upgrade : this.upgrade) { logger.info("testing upgrade from {} to {}", upgrade.initial.version, upgrade.upgradeVersions); - try (UpgradeableCluster cluster = init(UpgradeableCluster.create(nodeCount, upgrade.initial, configConsumer, builderConsumer))) + try (UpgradeableCluster cluster = init(UpgradeableCluster.create(nodeCount, upgrade.initial, configConsumer, builderConsumer, tokenSupplier, nodeIdTopology))) { setup.run(cluster); diff --git a/test/unit/org/apache/cassandra/tcm/compatibility/GossipHelperTest.java b/test/unit/org/apache/cassandra/tcm/compatibility/GossipHelperTest.java index 3ff7c03e83..a7ec4ad780 100644 --- a/test/unit/org/apache/cassandra/tcm/compatibility/GossipHelperTest.java +++ b/test/unit/org/apache/cassandra/tcm/compatibility/GossipHelperTest.java @@ -54,6 +54,7 @@ import static org.apache.cassandra.gms.VersionedValue.*; import static org.apache.cassandra.locator.InetAddressAndPort.getByName; import static org.apache.cassandra.tcm.compatibility.GossipHelper.fromEndpointStates; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -146,6 +147,33 @@ public class GossipHelperTest verifyPlacements(endpoints, metadata); } + @Test + public void duplicateHostIdTest() throws UnknownHostException + { + int nodes = 10; + Keyspaces kss = Keyspaces.NONE.with(KSM_NTS); + DistributedSchema schema = new DistributedSchema(kss); + + Map<InetAddressAndPort, EndpointState> epstates = new HashMap<>(); + UUID toDupe = UUID.randomUUID(); + for (int i = 1; i < nodes; i++) + { + long t = i * 1000L; + InetAddressAndPort endpoint = getByName("127.0.0."+i); + Token token = t(t); + UUID hostId = i == 1 ? toDupe : UUID.randomUUID(); + EndpointState endpointState = epstate(100, endpoint, endpoint, token, hostId, i % 2 == 1 ? "dc1" : "dc2"); + epstates.put(endpoint, endpointState); + } + + InetAddressAndPort oldDuplicate = getByName("127.0.0." + nodes); + epstates.put(oldDuplicate, epstate(50, oldDuplicate, oldDuplicate, t(nodes), toDupe, "dc2")); + + ClusterMetadata metadata = fromEndpointStates(epstates, Murmur3Partitioner.instance, schema); + assertEquals(epstates.size() - 1, metadata.directory.addresses.size()); + assertNull(metadata.directory.peerId(oldDuplicate)); + } + private static void verifyPlacements(Map<Integer, Token> endpoints, ClusterMetadata metadata) throws UnknownHostException { // quick check to make sure cm.placements is populated @@ -185,8 +213,11 @@ public class GossipHelperTest assertTrue("endpoint "+ep+" should be in " + eps, eps.contains(ep)); } } - private static EndpointState epstate(InetAddressAndPort internalAddress, InetAddressAndPort nativeAddress, Token token, UUID hostId, String dc) + { + return epstate(1, internalAddress, nativeAddress, token, hostId, dc); + } + private static EndpointState epstate(int generation, InetAddressAndPort internalAddress, InetAddressAndPort nativeAddress, Token token, UUID hostId, String dc) { Map<ApplicationState, VersionedValue> versionedValues = new EnumMap<>(ApplicationState.class); versionedValues.put(STATUS_WITH_PORT, vvf.normal(Collections.singleton(token))); @@ -197,7 +228,7 @@ public class GossipHelperTest versionedValues.put(RELEASE_VERSION, vvf.releaseVersion("3.0.24")); versionedValues.put(NATIVE_ADDRESS_AND_PORT, vvf.nativeaddressAndPort(nativeAddress)); versionedValues.put(INTERNAL_ADDRESS_AND_PORT, vvf.internalAddressAndPort(internalAddress)); - return new EndpointState(new HeartBeatState(1, 1), versionedValues); + return new EndpointState(new HeartBeatState(generation, 1), versionedValues); } private EndpointState withState(String status) throws UnknownHostException --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org