This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 209288cebecba15b320ccd7dd041adab78ce0ff7
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Mon Nov 10 09:15:42 2025 +0000

    Reject PrepareJoin if tokens already assigned
    
    Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for CASSANDRA-21006
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/tcm/ClusterMetadata.java  |   5 +-
 .../apache/cassandra/tcm/membership/Directory.java |   3 +-
 .../cassandra/tcm/transformations/PrepareJoin.java |  13 +++
 .../service/accord/AccordTopologyUtils.java        |   2 +-
 .../reads/repair/AbstractReadRepairTest.java       |   6 +-
 .../tcm/transformations/PrepareJoinTest.java       | 130 +++++++++++++++++++++
 .../tcm/transformations/PrepareLeaveTest.java      |  26 ++++-
 8 files changed, 173 insertions(+), 13 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 7b7ff72e81..f0d05ba26e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Reject PrepareJoin if tokens are already assigned (CASSANDRA-21006)
  * Don't update registration status if node state for decommissioned peer is 
found with the same address (CASSANDRA-21005)
  * Avoid NPE when meta keyspace placements are empty before CMS is initialized 
(CASSANDRA-21004)
  * Gossip entries for hibernating non-members don't block truncate 
(CASSANDRA-21003)
diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java 
b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
index 65cb869faf..8c962ae299 100644
--- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
+++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
@@ -480,9 +480,10 @@ public class ClusterMetadata
             return this;
         }
 
-        public Transformer register(NodeId nodeId, NodeAddresses addresses, 
Location location, NodeVersion version)
+        @VisibleForTesting
+        public Transformer unsafeRegisterForTesting(NodeId nodeId, 
NodeAddresses addresses, Location location, NodeVersion version)
         {
-            directory = directory.with(nodeId, addresses, location, version);
+            directory = directory.unsafeWithNodeForTesting(nodeId, addresses, 
location, version);
             return this;
         }
 
diff --git a/src/java/org/apache/cassandra/tcm/membership/Directory.java 
b/src/java/org/apache/cassandra/tcm/membership/Directory.java
index b7d8684976..8e73f7f341 100644
--- a/src/java/org/apache/cassandra/tcm/membership/Directory.java
+++ b/src/java/org/apache/cassandra/tcm/membership/Directory.java
@@ -178,7 +178,8 @@ public class Directory implements MetadataValue<Directory>
         return with(addresses, location, CURRENT);
     }
 
-    public Directory with(NodeId id, NodeAddresses addresses, Location 
location, NodeVersion nodeVersion)
+    @VisibleForTesting
+    public Directory unsafeWithNodeForTesting(NodeId id, NodeAddresses 
addresses, Location location, NodeVersion nodeVersion)
     {
         return with(addresses, id, id.toUUID(), location, nodeVersion);
     }
diff --git a/src/java/org/apache/cassandra/tcm/transformations/PrepareJoin.java 
b/src/java/org/apache/cassandra/tcm/transformations/PrepareJoin.java
index 02fd3d9483..55220e0cea 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/PrepareJoin.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/PrepareJoin.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.tcm.transformations;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableSet;
 
@@ -30,6 +31,7 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.Transformation;
@@ -132,6 +134,17 @@ public class PrepareJoin implements Transformation
         if (!ALLOWED_STATES.contains(prev.directory.peerState(nodeId)))
             return new Rejected(INVALID, String.format("Rejecting this plan as 
the node %s is in state %s",
                                                        nodeId, 
prev.directory.peerState(nodeId)));
+        Set<Token> alreadyAssigned = 
prev.tokenMap.tokens().stream().filter(tokens::contains).collect(Collectors.toSet());
+        if (!alreadyAssigned.isEmpty())
+        {
+            String assignedString = alreadyAssigned.stream()
+                                                   .map(t -> {
+                                                       NodeId n = 
prev.tokenMap.owner(t);
+                                                       InetAddressAndPort e = 
prev.directory.endpoint(n);
+                                                       return 
String.format("%s (node %s|%s)", t, n.id(), e);
+                                                   
}).collect(Collectors.joining(","));
+            return new Rejected(INVALID, String.format("Rejecting this plan as 
some tokens are already assigned: [%s]", assignedString));
+        }
 
         PlacementTransitionPlan transitionPlan = 
placementProvider.planForJoin(prev, nodeId, tokens, prev.schema.getKeyspaces());
 
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordTopologyUtils.java 
b/test/unit/org/apache/cassandra/service/accord/AccordTopologyUtils.java
index e4a2a4791c..2660f07096 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTopologyUtils.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTopologyUtils.java
@@ -81,7 +81,7 @@ public class AccordTopologyUtils
         NodeId nodeId = nodeId(node);
         InetAddressAndPort ep = ep(node);
         NodeAddresses addresses = new NodeAddresses(nodeId.toUUID(), ep, ep, 
ep);
-        transformer.register(nodeId, addresses, LOCATION, NodeVersion.CURRENT);
+        transformer.unsafeRegisterForTesting(nodeId, addresses, LOCATION, 
NodeVersion.CURRENT);
         transformer.withNodeState(nodeId, NodeState.JOINED);
         transformer.proposeToken(nodeId, Collections.singleton(token));
         transformer.addToRackAndDC(nodeId);
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
 
b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
index 4a6d31447c..4e206fe5b5 100644
--- 
a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
@@ -254,9 +254,9 @@ public abstract  class AbstractReadRepairTest
         ClusterMetadataTestHelper.addEndpoint(replica1.endpoint(), new 
BytesToken(new byte[] { 0 }), dataCenter1, rack);
         ClusterMetadataTestHelper.addEndpoint(replica2.endpoint(), new 
BytesToken(new byte[] { 1 }), dataCenter1, rack);
         ClusterMetadataTestHelper.addEndpoint(replica3.endpoint(), new 
BytesToken(new byte[] { 2 }), dataCenter1, rack);
-        ClusterMetadataTestHelper.addEndpoint(remoteReplica1.endpoint(), new 
BytesToken(new byte[] { 0 }), dataCenter2, rack);
-        ClusterMetadataTestHelper.addEndpoint(remoteReplica2.endpoint(), new 
BytesToken(new byte[] { 1 }), dataCenter2, rack);
-        ClusterMetadataTestHelper.addEndpoint(remoteReplica3.endpoint(), new 
BytesToken(new byte[] { 2 }), dataCenter2, rack);
+        ClusterMetadataTestHelper.addEndpoint(remoteReplica1.endpoint(), new 
BytesToken(new byte[] { 3 }), dataCenter2, rack);
+        ClusterMetadataTestHelper.addEndpoint(remoteReplica2.endpoint(), new 
BytesToken(new byte[] { 4 }), dataCenter2, rack);
+        ClusterMetadataTestHelper.addEndpoint(remoteReplica3.endpoint(), new 
BytesToken(new byte[] { 5 }), dataCenter2, rack);
 
         for (Replica replica : replicas)
         {
diff --git 
a/test/unit/org/apache/cassandra/tcm/transformations/PrepareJoinTest.java 
b/test/unit/org/apache/cassandra/tcm/transformations/PrepareJoinTest.java
new file mode 100644
index 0000000000..33ccb137ae
--- /dev/null
+++ b/test/unit/org/apache/cassandra/tcm/transformations/PrepareJoinTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.transformations;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
+import org.apache.cassandra.exceptions.ExceptionCode;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeState;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.ownership.OwnershipUtils;
+
+import static 
org.apache.cassandra.tcm.membership.MembershipUtils.nodeAddresses;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class PrepareJoinTest
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(PrepareJoinTest.class);
+    private Random random;
+    private IPartitioner partitioner;
+    private NodeId other;
+    private NodeId joining;
+
+    @Before
+    public void setup()
+    {
+        long seed = System.nanoTime();
+        logger.info("Running test with seed {}", seed);
+        random = new Random(seed);
+        partitioner = Murmur3Partitioner.instance;
+        other = new NodeId(1);
+        joining = new NodeId(2);
+    }
+
+    @Test
+    public void singletonTokenAlreadyAssigned()
+    {
+        ClusterMetadata metadata = metadata();
+        Collection<Token> assigned = metadata.tokenMap.tokens(other);
+        Set<Token> toJoin = Collections.singleton(assigned.iterator().next());
+
+        PrepareJoin prepare = new PrepareJoin(joining, toJoin, 
PrepareLeaveTest.dummyPlacementProvider, true, true);
+        Transformation.Result result = prepare.execute(metadata);
+        assertTrue(result.isRejected());
+        assertEquals(ExceptionCode.INVALID, result.rejected().code);
+        assertTrue(result.rejected().reason.startsWith("Rejecting this plan as 
some tokens are already assigned"));
+    }
+
+    @Test
+    public void multipleTokensAlreadyAssigned()
+    {
+        ClusterMetadata metadata = metadata();
+        Collection<Token> assigned = metadata.tokenMap.tokens(other);
+        Set<Token> toJoin = new HashSet<>(assigned);
+
+        PrepareJoin prepare = new PrepareJoin(joining, toJoin, 
PrepareLeaveTest.dummyPlacementProvider, true, true);
+        Transformation.Result result = prepare.execute(metadata);
+        assertTrue(result.isRejected());
+        assertEquals(ExceptionCode.INVALID, result.rejected().code);
+        assertTrue(result.rejected().reason.startsWith("Rejecting this plan as 
some tokens are already assigned"));
+    }
+
+    @Test
+    public void noTokensAlreadyAssigned()
+    {
+        ClusterMetadata metadata = metadata();
+        Collection<Token> assigned = metadata.tokenMap.tokens(other);
+        Set<Token> toJoin = OwnershipUtils.randomTokens(16, partitioner, 
random);
+        while (!Sets.intersection(new HashSet<>(assigned), toJoin).isEmpty())
+            toJoin = OwnershipUtils.randomTokens(16, partitioner, random);
+
+        PrepareJoin prepare = new PrepareJoin(joining, toJoin, 
PrepareLeaveTest.dummyPlacementProvider, true, true);
+        Transformation.Result result = prepare.execute(metadata);
+        assertTrue(result.isSuccess());
+    }
+
+    private ClusterMetadata metadata()
+    {
+        partitioner = Murmur3Partitioner.instance;
+        other = new NodeId(1);
+        joining = new NodeId(2);
+        Location location = new Location("dc", "rack");
+        Directory directory = new Directory().unsafeWithNodeForTesting(other, 
nodeAddresses(random), location, NodeVersion.CURRENT)
+                                             .withNodeState(other, 
NodeState.JOINED)
+                                             
.unsafeWithNodeForTesting(joining, nodeAddresses(random), location, 
NodeVersion.CURRENT)
+                                             .withNodeState(joining, 
NodeState.REGISTERED);
+        Set<Token> ownedTokens = OwnershipUtils.randomTokens(16, partitioner, 
random);
+        return ClusterMetadataTestHelper.minimalForTesting(partitioner)
+                                        .transformer()
+                                        .with(directory)
+                                        .proposeToken(other, ownedTokens)
+                                        .build().metadata;
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/tcm/transformations/PrepareLeaveTest.java 
b/test/unit/org/apache/cassandra/tcm/transformations/PrepareLeaveTest.java
index 4607fa9618..a1b53119fd 100644
--- a/test/unit/org/apache/cassandra/tcm/transformations/PrepareLeaveTest.java
+++ b/test/unit/org/apache/cassandra/tcm/transformations/PrepareLeaveTest.java
@@ -151,27 +151,41 @@ public class PrepareLeaveTest
     public static PlacementProvider dummyPlacementProvider = new 
PlacementProvider()
     {
         @Override
-        public DataPlacements calculatePlacements(Epoch epoch, 
List<Range<Token>> ranges, ClusterMetadata metadata, Keyspaces keyspaces) { 
return null; }
+        public DataPlacements calculatePlacements(Epoch epoch, 
List<Range<Token>> ranges, ClusterMetadata metadata, Keyspaces keyspaces)
+        {
+            return null;
+        }
 
         @Override
-        public PlacementTransitionPlan planForJoin(ClusterMetadata metadata, 
NodeId nodeId, Set<Token> tokens, Keyspaces keyspaces) { return null;}
+        public PlacementTransitionPlan planForJoin(ClusterMetadata metadata, 
NodeId nodeId, Set<Token> tokens, Keyspaces keyspaces)
+        {
+            return noop();
+        }
 
         @Override
         public PlacementTransitionPlan planForMove(ClusterMetadata metadata, 
NodeId nodeId, Set<Token> tokens, Keyspaces keyspaces)
         {
-            return null;
+            return noop();
         }
 
         @Override
         public PlacementTransitionPlan planForDecommission(ClusterMetadata 
metadata, NodeId nodeId, Keyspaces keyspaces)
+        {
+            return noop();
+        }
+
+        @Override
+        public PlacementTransitionPlan planForReplacement(ClusterMetadata 
metadata, NodeId replaced, NodeId replacement, Keyspaces keyspaces)
+        {
+            return noop();
+        }
+
+        private PlacementTransitionPlan noop()
         {
             return new PlacementTransitionPlan(PlacementDeltas.empty(),
                                                PlacementDeltas.empty(),
                                                PlacementDeltas.empty(),
                                                PlacementDeltas.empty());
         }
-
-        @Override
-        public PlacementTransitionPlan planForReplacement(ClusterMetadata 
metadata, NodeId replaced, NodeId replacement, Keyspaces keyspaces) { return 
null; }
     };
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to