This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 209288cebecba15b320ccd7dd041adab78ce0ff7 Author: Sam Tunnicliffe <[email protected]> AuthorDate: Mon Nov 10 09:15:42 2025 +0000 Reject PrepareJoin if tokens already assigned Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for CASSANDRA-21006 --- CHANGES.txt | 1 + .../org/apache/cassandra/tcm/ClusterMetadata.java | 5 +- .../apache/cassandra/tcm/membership/Directory.java | 3 +- .../cassandra/tcm/transformations/PrepareJoin.java | 13 +++ .../service/accord/AccordTopologyUtils.java | 2 +- .../reads/repair/AbstractReadRepairTest.java | 6 +- .../tcm/transformations/PrepareJoinTest.java | 130 +++++++++++++++++++++ .../tcm/transformations/PrepareLeaveTest.java | 26 ++++- 8 files changed, 173 insertions(+), 13 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 7b7ff72e81..f0d05ba26e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Reject PrepareJoin if tokens are already assigned (CASSANDRA-21006) * Don't update registration status if node state for decommissioned peer is found with the same address (CASSANDRA-21005) * Avoid NPE when meta keyspace placements are empty before CMS is initialized (CASSANDRA-21004) * Gossip entries for hibernating non-members don't block truncate (CASSANDRA-21003) diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java index 65cb869faf..8c962ae299 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java @@ -480,9 +480,10 @@ public class ClusterMetadata return this; } - public Transformer register(NodeId nodeId, NodeAddresses addresses, Location location, NodeVersion version) + @VisibleForTesting + public Transformer unsafeRegisterForTesting(NodeId nodeId, NodeAddresses addresses, Location location, NodeVersion version) { - directory = directory.with(nodeId, addresses, location, version); + directory = directory.unsafeWithNodeForTesting(nodeId, addresses, location, version); return this; } diff --git a/src/java/org/apache/cassandra/tcm/membership/Directory.java b/src/java/org/apache/cassandra/tcm/membership/Directory.java index b7d8684976..8e73f7f341 100644 --- a/src/java/org/apache/cassandra/tcm/membership/Directory.java +++ b/src/java/org/apache/cassandra/tcm/membership/Directory.java @@ -178,7 +178,8 @@ public class Directory implements MetadataValue<Directory> return with(addresses, location, CURRENT); } - public Directory with(NodeId id, NodeAddresses addresses, Location location, NodeVersion nodeVersion) + @VisibleForTesting + public Directory unsafeWithNodeForTesting(NodeId id, NodeAddresses addresses, Location location, NodeVersion nodeVersion) { return with(addresses, id, id.toUUID(), location, nodeVersion); } diff --git a/src/java/org/apache/cassandra/tcm/transformations/PrepareJoin.java b/src/java/org/apache/cassandra/tcm/transformations/PrepareJoin.java index 02fd3d9483..55220e0cea 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/PrepareJoin.java +++ b/src/java/org/apache/cassandra/tcm/transformations/PrepareJoin.java @@ -21,6 +21,7 @@ package org.apache.cassandra.tcm.transformations; import java.io.IOException; import java.util.HashSet; import java.util.Set; +import java.util.stream.Collectors; import com.google.common.collect.ImmutableSet; @@ -30,6 +31,7 @@ import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Transformation; @@ -132,6 +134,17 @@ public class PrepareJoin implements Transformation if (!ALLOWED_STATES.contains(prev.directory.peerState(nodeId))) return new Rejected(INVALID, String.format("Rejecting this plan as the node %s is in state %s", nodeId, prev.directory.peerState(nodeId))); + Set<Token> alreadyAssigned = prev.tokenMap.tokens().stream().filter(tokens::contains).collect(Collectors.toSet()); + if (!alreadyAssigned.isEmpty()) + { + String assignedString = alreadyAssigned.stream() + .map(t -> { + NodeId n = prev.tokenMap.owner(t); + InetAddressAndPort e = prev.directory.endpoint(n); + return String.format("%s (node %s|%s)", t, n.id(), e); + }).collect(Collectors.joining(",")); + return new Rejected(INVALID, String.format("Rejecting this plan as some tokens are already assigned: [%s]", assignedString)); + } PlacementTransitionPlan transitionPlan = placementProvider.planForJoin(prev, nodeId, tokens, prev.schema.getKeyspaces()); diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTopologyUtils.java b/test/unit/org/apache/cassandra/service/accord/AccordTopologyUtils.java index e4a2a4791c..2660f07096 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordTopologyUtils.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordTopologyUtils.java @@ -81,7 +81,7 @@ public class AccordTopologyUtils NodeId nodeId = nodeId(node); InetAddressAndPort ep = ep(node); NodeAddresses addresses = new NodeAddresses(nodeId.toUUID(), ep, ep, ep); - transformer.register(nodeId, addresses, LOCATION, NodeVersion.CURRENT); + transformer.unsafeRegisterForTesting(nodeId, addresses, LOCATION, NodeVersion.CURRENT); transformer.withNodeState(nodeId, NodeState.JOINED); transformer.proposeToken(nodeId, Collections.singleton(token)); transformer.addToRackAndDC(nodeId); diff --git a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java index 4a6d31447c..4e206fe5b5 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java @@ -254,9 +254,9 @@ public abstract class AbstractReadRepairTest ClusterMetadataTestHelper.addEndpoint(replica1.endpoint(), new BytesToken(new byte[] { 0 }), dataCenter1, rack); ClusterMetadataTestHelper.addEndpoint(replica2.endpoint(), new BytesToken(new byte[] { 1 }), dataCenter1, rack); ClusterMetadataTestHelper.addEndpoint(replica3.endpoint(), new BytesToken(new byte[] { 2 }), dataCenter1, rack); - ClusterMetadataTestHelper.addEndpoint(remoteReplica1.endpoint(), new BytesToken(new byte[] { 0 }), dataCenter2, rack); - ClusterMetadataTestHelper.addEndpoint(remoteReplica2.endpoint(), new BytesToken(new byte[] { 1 }), dataCenter2, rack); - ClusterMetadataTestHelper.addEndpoint(remoteReplica3.endpoint(), new BytesToken(new byte[] { 2 }), dataCenter2, rack); + ClusterMetadataTestHelper.addEndpoint(remoteReplica1.endpoint(), new BytesToken(new byte[] { 3 }), dataCenter2, rack); + ClusterMetadataTestHelper.addEndpoint(remoteReplica2.endpoint(), new BytesToken(new byte[] { 4 }), dataCenter2, rack); + ClusterMetadataTestHelper.addEndpoint(remoteReplica3.endpoint(), new BytesToken(new byte[] { 5 }), dataCenter2, rack); for (Replica replica : replicas) { diff --git a/test/unit/org/apache/cassandra/tcm/transformations/PrepareJoinTest.java b/test/unit/org/apache/cassandra/tcm/transformations/PrepareJoinTest.java new file mode 100644 index 0000000000..33ccb137ae --- /dev/null +++ b/test/unit/org/apache/cassandra/tcm/transformations/PrepareJoinTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.transformations; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +import com.google.common.collect.Sets; +import org.junit.Before; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper; +import org.apache.cassandra.exceptions.ExceptionCode; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.Transformation; +import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.membership.Location; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.membership.NodeState; +import org.apache.cassandra.tcm.membership.NodeVersion; +import org.apache.cassandra.tcm.ownership.OwnershipUtils; + +import static org.apache.cassandra.tcm.membership.MembershipUtils.nodeAddresses; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class PrepareJoinTest +{ + private static final Logger logger = LoggerFactory.getLogger(PrepareJoinTest.class); + private Random random; + private IPartitioner partitioner; + private NodeId other; + private NodeId joining; + + @Before + public void setup() + { + long seed = System.nanoTime(); + logger.info("Running test with seed {}", seed); + random = new Random(seed); + partitioner = Murmur3Partitioner.instance; + other = new NodeId(1); + joining = new NodeId(2); + } + + @Test + public void singletonTokenAlreadyAssigned() + { + ClusterMetadata metadata = metadata(); + Collection<Token> assigned = metadata.tokenMap.tokens(other); + Set<Token> toJoin = Collections.singleton(assigned.iterator().next()); + + PrepareJoin prepare = new PrepareJoin(joining, toJoin, PrepareLeaveTest.dummyPlacementProvider, true, true); + Transformation.Result result = prepare.execute(metadata); + assertTrue(result.isRejected()); + assertEquals(ExceptionCode.INVALID, result.rejected().code); + assertTrue(result.rejected().reason.startsWith("Rejecting this plan as some tokens are already assigned")); + } + + @Test + public void multipleTokensAlreadyAssigned() + { + ClusterMetadata metadata = metadata(); + Collection<Token> assigned = metadata.tokenMap.tokens(other); + Set<Token> toJoin = new HashSet<>(assigned); + + PrepareJoin prepare = new PrepareJoin(joining, toJoin, PrepareLeaveTest.dummyPlacementProvider, true, true); + Transformation.Result result = prepare.execute(metadata); + assertTrue(result.isRejected()); + assertEquals(ExceptionCode.INVALID, result.rejected().code); + assertTrue(result.rejected().reason.startsWith("Rejecting this plan as some tokens are already assigned")); + } + + @Test + public void noTokensAlreadyAssigned() + { + ClusterMetadata metadata = metadata(); + Collection<Token> assigned = metadata.tokenMap.tokens(other); + Set<Token> toJoin = OwnershipUtils.randomTokens(16, partitioner, random); + while (!Sets.intersection(new HashSet<>(assigned), toJoin).isEmpty()) + toJoin = OwnershipUtils.randomTokens(16, partitioner, random); + + PrepareJoin prepare = new PrepareJoin(joining, toJoin, PrepareLeaveTest.dummyPlacementProvider, true, true); + Transformation.Result result = prepare.execute(metadata); + assertTrue(result.isSuccess()); + } + + private ClusterMetadata metadata() + { + partitioner = Murmur3Partitioner.instance; + other = new NodeId(1); + joining = new NodeId(2); + Location location = new Location("dc", "rack"); + Directory directory = new Directory().unsafeWithNodeForTesting(other, nodeAddresses(random), location, NodeVersion.CURRENT) + .withNodeState(other, NodeState.JOINED) + .unsafeWithNodeForTesting(joining, nodeAddresses(random), location, NodeVersion.CURRENT) + .withNodeState(joining, NodeState.REGISTERED); + Set<Token> ownedTokens = OwnershipUtils.randomTokens(16, partitioner, random); + return ClusterMetadataTestHelper.minimalForTesting(partitioner) + .transformer() + .with(directory) + .proposeToken(other, ownedTokens) + .build().metadata; + } +} diff --git a/test/unit/org/apache/cassandra/tcm/transformations/PrepareLeaveTest.java b/test/unit/org/apache/cassandra/tcm/transformations/PrepareLeaveTest.java index 4607fa9618..a1b53119fd 100644 --- a/test/unit/org/apache/cassandra/tcm/transformations/PrepareLeaveTest.java +++ b/test/unit/org/apache/cassandra/tcm/transformations/PrepareLeaveTest.java @@ -151,27 +151,41 @@ public class PrepareLeaveTest public static PlacementProvider dummyPlacementProvider = new PlacementProvider() { @Override - public DataPlacements calculatePlacements(Epoch epoch, List<Range<Token>> ranges, ClusterMetadata metadata, Keyspaces keyspaces) { return null; } + public DataPlacements calculatePlacements(Epoch epoch, List<Range<Token>> ranges, ClusterMetadata metadata, Keyspaces keyspaces) + { + return null; + } @Override - public PlacementTransitionPlan planForJoin(ClusterMetadata metadata, NodeId nodeId, Set<Token> tokens, Keyspaces keyspaces) { return null;} + public PlacementTransitionPlan planForJoin(ClusterMetadata metadata, NodeId nodeId, Set<Token> tokens, Keyspaces keyspaces) + { + return noop(); + } @Override public PlacementTransitionPlan planForMove(ClusterMetadata metadata, NodeId nodeId, Set<Token> tokens, Keyspaces keyspaces) { - return null; + return noop(); } @Override public PlacementTransitionPlan planForDecommission(ClusterMetadata metadata, NodeId nodeId, Keyspaces keyspaces) + { + return noop(); + } + + @Override + public PlacementTransitionPlan planForReplacement(ClusterMetadata metadata, NodeId replaced, NodeId replacement, Keyspaces keyspaces) + { + return noop(); + } + + private PlacementTransitionPlan noop() { return new PlacementTransitionPlan(PlacementDeltas.empty(), PlacementDeltas.empty(), PlacementDeltas.empty(), PlacementDeltas.empty()); } - - @Override - public PlacementTransitionPlan planForReplacement(ClusterMetadata metadata, NodeId replaced, NodeId replacement, Keyspaces keyspaces) { return null; } }; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
