This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 4fb81ea483dc8d95dac11c234ca6cb3aaeac44a8
Author: Marcus Eriksson <marc...@apache.org>
AuthorDate: Mon Mar 24 13:27:38 2025 +0100

    Add nodetool command to abort failed nodetool cms initialize
    
    Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-20482
---
 CHANGES.txt                                        |  1 +
 .../org/apache/cassandra/tcm/CMSOperations.java    |  6 +++
 .../apache/cassandra/tcm/CMSOperationsMBean.java   |  1 +
 .../apache/cassandra/tcm/migration/Election.java   | 44 +++++++++++++----
 src/java/org/apache/cassandra/tools/NodeTool.java  |  3 +-
 .../apache/cassandra/tools/nodetool/CMSAdmin.java  | 13 +++++
 .../ClusterMetadataUpgradeAbortMigrationTest.java  | 55 ++++++++++++++++++++++
 .../upgrade/ClusterMetadataUpgradeTest.java        |  3 +-
 8 files changed, 115 insertions(+), 11 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 1360ab01bc..864e664620 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Add nodetool command to abort failed nodetool cms initialize 
(CASSANDRA-20482)
  * Repair Paxos for the distributed metadata log when CMS membership changes 
(CASSANDRA-20467)
  * Reintroduce CASSANDRA-17411 in trunk (CASSANDRA-19346)
  * Add min/max/mean/percentiles to timer metrics vtable (CASSANDRA-20466)
diff --git a/src/java/org/apache/cassandra/tcm/CMSOperations.java 
b/src/java/org/apache/cassandra/tcm/CMSOperations.java
index f4a608b31e..b37da9dd94 100644
--- a/src/java/org/apache/cassandra/tcm/CMSOperations.java
+++ b/src/java/org/apache/cassandra/tcm/CMSOperations.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.schema.ReplicationParams;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.membership.NodeState;
 import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.migration.Election;
 import org.apache.cassandra.tcm.sequences.CancelCMSReconfiguration;
 import org.apache.cassandra.tcm.sequences.InProgressSequences;
 import org.apache.cassandra.tcm.sequences.ReconfigureCMS;
@@ -80,6 +81,11 @@ public class CMSOperations implements CMSOperationsMBean
         cms.upgradeFromGossip(ignoredEndpoints);
     }
 
+    public void abortInitialization(String initiator)
+    {
+        Election.instance.abortInitialization(initiator);
+    }
+
     @Override
     public void resumeReconfigureCms()
     {
diff --git a/src/java/org/apache/cassandra/tcm/CMSOperationsMBean.java 
b/src/java/org/apache/cassandra/tcm/CMSOperationsMBean.java
index 3b6e4b526b..1e2d9e1473 100644
--- a/src/java/org/apache/cassandra/tcm/CMSOperationsMBean.java
+++ b/src/java/org/apache/cassandra/tcm/CMSOperationsMBean.java
@@ -25,6 +25,7 @@ import java.util.Map;
 public interface CMSOperationsMBean
 {
     public void initializeCMS(List<String> ignore);
+    public void abortInitialization(String initiator);
     public void resumeReconfigureCms();
     public void reconfigureCMS(int rf);
     public void reconfigureCMS(Map<String, Integer> rf);
diff --git a/src/java/org/apache/cassandra/tcm/migration/Election.java 
b/src/java/org/apache/cassandra/tcm/migration/Election.java
index 04e1a8fa4c..69fac6f4ba 100644
--- a/src/java/org/apache/cassandra/tcm/migration/Election.java
+++ b/src/java/org/apache/cassandra/tcm/migration/Election.java
@@ -56,6 +56,7 @@ import org.apache.cassandra.utils.Pair;
 public class Election
 {
     private static final Logger logger = 
LoggerFactory.getLogger(Election.class);
+    private static final CMSInitializationRequest.Initiator MIGRATING = new 
CMSInitializationRequest.Initiator(null, null);
     private static final CMSInitializationRequest.Initiator MIGRATED = new 
CMSInitializationRequest.Initiator(null, null);
 
     private final AtomicReference<CMSInitializationRequest.Initiator> 
initiator = new AtomicReference<>();
@@ -127,14 +128,21 @@ public class Election
     private void finish(Set<InetAddressAndPort> sendTo)
     {
         CMSInitializationRequest.Initiator currentInitiator = initiator.get();
-        assert 
currentInitiator.initiator.equals(FBUtilities.getBroadcastAddressAndPort());
-
-        Startup.initializeAsFirstCMSNode();
-        Register.maybeRegister();
-        
SystemKeyspace.setLocalHostId(ClusterMetadata.current().myNodeId().toUUID());
+        if (currentInitiator != null &&
+            Objects.equals(currentInitiator.initiator, 
FBUtilities.getBroadcastAddressAndPort()) &&
+            initiator.compareAndSet(currentInitiator, MIGRATING))
+        {
+            Startup.initializeAsFirstCMSNode();
+            Register.maybeRegister();
+            
SystemKeyspace.setLocalHostId(ClusterMetadata.current().myNodeId().toUUID());
 
-        updateInitiator(currentInitiator, MIGRATED);
-        MessageDelivery.fanoutAndWait(messaging, sendTo, Verb.TCM_NOTIFY_REQ, 
DistributedMetadataLogKeyspace.getLogState(Epoch.EMPTY, false));
+            updateInitiator(MIGRATING, MIGRATED);
+            MessageDelivery.fanoutAndWait(messaging, sendTo, 
Verb.TCM_NOTIFY_REQ, DistributedMetadataLogKeyspace.getLogState(Epoch.EMPTY, 
false));
+        }
+        else
+        {
+            throw new IllegalStateException("Can't finish migration, 
initiator="+currentInitiator);
+        }
     }
 
     private void abort(Set<InetAddressAndPort> sendTo)
@@ -166,6 +174,25 @@ public class Election
         return initiator != null && initiator != MIGRATED;
     }
 
+    public void abortInitialization(String initiatorEp)
+    {
+        InetAddressAndPort expectedInitiator = 
InetAddressAndPort.getByNameUnchecked(initiatorEp);
+        CMSInitializationRequest.Initiator currentInitiator = initiator.get();
+        if (currentInitiator != null && 
Objects.equals(currentInitiator.initiator, expectedInitiator) && 
initiator.compareAndSet(currentInitiator, null))
+        {
+            for (InetAddressAndPort ep : 
ClusterMetadata.current().directory.allJoinedEndpoints())
+            {
+                if (!ep.equals(FBUtilities.getBroadcastAddressAndPort()))
+                    messaging.send(Message.out(Verb.TCM_ABORT_MIG, 
currentInitiator), ep);
+            }
+        }
+        else
+        {
+            throw new IllegalStateException("Current initiator [" + 
currentInitiator +"] does not match provided " + expectedInitiator +
+                                            " - run this command on a node 
where initialization has not yet been cleared, with the correct expected 
initiator");
+        }
+    }
+
     public class PrepareHandler implements 
IVerbHandler<CMSInitializationRequest>
     {
         @Override
@@ -209,7 +236,8 @@ public class Election
         public void doVerb(Message<CMSInitializationRequest.Initiator> 
message) throws IOException
         {
             logger.info("Received election abort message {} from {}", 
message.payload, message.from());
-            if (!message.from().equals(initiator().initiator) || 
!updateInitiator(message.payload, null))
+            CMSInitializationRequest.Initiator initiator = message.payload;
+            if (!initiator.initiator.equals(initiator().initiator) || 
!updateInitiator(message.payload, null))
                 logger.error("Could not clear initiator - initiator is set to 
{}, abort message received from {}", initiator(), message.payload);
         }
     }
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java 
b/src/java/org/apache/cassandra/tools/NodeTool.java
index 49be441b89..d7bcc25b71 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -268,7 +268,8 @@ public class NodeTool
                .withCommand(CMSAdmin.InitializeCMS.class)
                .withCommand(CMSAdmin.ReconfigureCMS.class)
                .withCommand(CMSAdmin.Snapshot.class)
-               .withCommand(CMSAdmin.Unregister.class);
+               .withCommand(CMSAdmin.Unregister.class)
+               .withCommand(CMSAdmin.AbortInitialization.class);
 
         Cli<NodeToolCmdRunnable> parser = builder.build();
 
diff --git a/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java 
b/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java
index 0ed853ba68..02cc045545 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java
@@ -194,4 +194,17 @@ public abstract class CMSAdmin extends NodeTool.NodeToolCmd
             probe.getCMSOperationsProxy().unregisterLeftNodes(nodeIds);
         }
     }
+
+    @Command(name = "abortinitialization", description = "Abort an incomplete 
initialization")
+    public static class AbortInitialization extends NodeTool.NodeToolCmd
+    {
+        @Option(required = true, name = "--initiator", title = "Initiator", 
description = "The address of the node where `cms initialize` was run.")
+        public String initiator;
+
+        @Override
+        protected void execute(NodeProbe probe)
+        {
+            probe.getCMSOperationsProxy().abortInitialization(initiator);
+        }
+    }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeAbortMigrationTest.java
 
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeAbortMigrationTest.java
new file mode 100644
index 0000000000..bab722346d
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeAbortMigrationTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Constants;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.net.Verb;
+
+public class ClusterMetadataUpgradeAbortMigrationTest extends UpgradeTestBase
+{
+    @Test
+    public void upgradeLostNotifyTest() throws Throwable
+    {
+        new UpgradeTestBase.TestCase()
+        .nodes(3)
+        .nodesToUpgrade(1, 2, 3)
+        .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP)
+                                .set(Constants.KEY_DTEST_FULL_STARTUP, true))
+        .upgradesToCurrentFrom(v50)
+        .setup((cluster) -> {
+            cluster.schemaChange(withKeyspace("ALTER KEYSPACE %s WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor':2}"));
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
+        })
+        .runAfterClusterUpgrade((cluster) -> {
+            cluster.filters().verbs(Verb.TCM_INIT_MIG_RSP.id,
+                                    Verb.TCM_ABORT_MIG.id).drop();
+            cluster.get(1).nodetoolResult("cms", 
"initialize").asserts().failure();
+            cluster.filters().reset();
+            // it was already cleared on node1:
+            cluster.get(1).nodetoolResult("cms", "abortinitialization", 
"--initiator", "127.0.0.1").asserts().failure();
+            // but not on node2
+            cluster.get(2).nodetoolResult("cms", "abortinitialization", 
"--initiator", "127.0.0.1").asserts().success();
+
+            cluster.get(1).nodetoolResult("cms", 
"initialize").asserts().success();
+        }).run();
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeTest.java
 
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeTest.java
index 930933425c..eb717602f3 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeTest.java
@@ -113,6 +113,5 @@ public class ClusterMetadataUpgradeTest extends 
UpgradeTestBase
             cluster.get(1).nodetoolResult("cms", "initialize", "--ignore", 
"127.0.0.3").asserts().success();
             cluster.schemaChange(withKeyspace("create table %s.tbl2 (id int 
primary key)"));
         }).run();
-        }
-
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to